diff --git a/Cargo.toml b/Cargo.toml index ced98a3..2f72869 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ uuid = { version = "1.5", optional = true } reqwest = { version = "0.11", optional = true } simd-json = { version = "0.13", optional = true } url = { version = "2.5", optional = true } +rand = { version = "0.8", optional = true } regex = { version = "1.10", optional = true } [dev-dependencies] @@ -28,7 +29,7 @@ tokio = { version = "1", features = [ "rt", "rt-multi-thread", "macros", "net" ] [features] default = [ "tls-native", "twitch", "youtube" ] twitch = [ "dep:irc", "dep:uuid" ] -youtube = [ "dep:simd-json", "dep:reqwest", "dep:serde", "dep:url", "dep:regex", "dep:serde-aux" ] +youtube = [ "dep:simd-json", "dep:reqwest", "dep:rand", "dep:serde", "dep:url", "dep:regex", "dep:serde-aux" ] serde = [ "dep:serde", "chrono/serde", "uuid?/serde" ] tls-native = [ "irc?/tls-native" ] #tls-rust = [ "irc?/tls-rust" ] diff --git a/examples/youtube.rs b/examples/youtube.rs index 7cb94ef..29b4827 100644 --- a/examples/youtube.rs +++ b/examples/youtube.rs @@ -1,12 +1,14 @@ -use std::env::args; +use std::{env::args, future::IntoFuture}; use brainrot::youtube; use futures_util::StreamExt; #[tokio::main] async fn main() -> anyhow::Result<()> { - let (options, cont) = youtube::get_options_from_live_page("5Z5Sys8-tLs").await?; + let (options, cont) = youtube::get_options_from_live_page("6DcXroWNDvk").await?; let initial_chat = youtube::fetch_yt_chat_page(&options, cont).await?; - youtube::subscribe_to_events(&options, &initial_chat).await?; + let subscriber = youtube::SignalerChannel::new_from_cont(&initial_chat).await?; + let (receiver, handle) = subscriber.spawn_event_subscriber().await?; + handle.into_future().await.unwrap(); Ok(()) } diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index 62451a7..9557fec 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -1,20 +1,25 @@ use std::{ collections::{HashMap, VecDeque}, io::BufRead, - sync::OnceLock + iter, + sync::{Arc, OnceLock} }; +use rand::Rng; use regex::Regex; use reqwest::{ header::{self, HeaderMap, HeaderValue}, - StatusCode + Response, StatusCode }; use simd_json::{ base::{ValueAsContainer, ValueAsScalar}, OwnedValue }; use thiserror::Error; -use tokio::sync::Mutex; +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinHandle +}; use url::Url; mod types; @@ -254,86 +259,214 @@ pub async fn fetch_yt_chat_page(options: &RequestOptions, continuation: impl AsR Ok(response) } -pub async fn subscribe_to_events(options: &RequestOptions, initial_continuation: &GetLiveChatResponse) -> Result<(), YouTubeError> { - let topic_id = &initial_continuation - .continuation_contents - .as_ref() - .unwrap() - .live_chat_continuation - .continuations[0] - .invalidation_continuation_data - .as_ref() - .unwrap() - .invalidation_id - .topic; - - let server_response: OwnedValue = get_http_client() - .post(Url::parse_with_params(GCM_SIGNALER_SRQE, [("key", LIVE_CHAT_BASE_TANGO_KEY)])?) - .header(header::CONTENT_TYPE, "application/json+protobuf") - .body(format!(r#"[[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{topic_id}"]]]]]]"#)) - .send() - .await? - .simd_json() - .await?; - let gsess = server_response.as_array().unwrap()[0].as_str().unwrap(); - - let mut ofs_parameters = HashMap::new(); - ofs_parameters.insert("count", "1".to_string()); - ofs_parameters.insert("ofs", "0".to_string()); - ofs_parameters - .insert("req0___data__", format!(r#"[[["1",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{topic_id}"]]]],null,null,1],null,3]]]"#)); - let ofs = get_http_client() - .post(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", &gsess), - ("key", LIVE_CHAT_BASE_TANGO_KEY), - ("RID", "60464"), - ("CVER", "22"), - ("zx", "uo5vp9j380ef"), - ("t", "1") - ] - )?) - .header("X-WebChannel-Content-Type", "application/json+protobuf") - .form(&ofs_parameters) - .send() - .await?; +#[derive(Debug, Default)] +struct SignalerChannelInner { + topic: String, + gsessionid: Option, + sid: Option, + rid: usize, + aid: usize, + session_n: usize +} - // standard response: [[0,["c","koBtCISzwmqJpalH1EqHSc","",8,12,30000]]] - // stream end response: [1,3,7] - let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); - let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; - let value = value.as_array().unwrap()[0].as_array().unwrap(); - assert_eq!(value[0].as_usize().unwrap(), 0); - let sid = value[1].as_array().unwrap()[1].as_str().unwrap(); - - let mut stream = get_http_client() - .get(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", &gsess), - ("key", LIVE_CHAT_BASE_TANGO_KEY), - ("RID", "rpc"), - ("SID", sid), - ("AID", "0"), - ("CI", "0"), - ("TYPE", "xmlhttp"), - ("zx", "uo5vp9j380ef"), - ("t", "1") - ] - )?) - .header(header::CONNECTION, "keep-alive") - .send() - .await?; +impl SignalerChannelInner { + pub fn with_topic(topic: impl ToString) -> Self { + Self { + topic: topic.to_string(), + ..Default::default() + } + } + + pub fn reset(&mut self) { + self.gsessionid = None; + self.sid = None; + self.rid = 0; + self.aid = 0; + self.session_n = 0; + } + + fn gen_zx() -> String { + const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789"; + let mut rng = rand::thread_rng(); + iter::repeat_with(|| CHARSET[rng.gen_range(0..CHARSET.len())] as char).take(11).collect() + } + + pub async fn choose_server(&mut self) -> Result<(), YouTubeError> { + let server_response: OwnedValue = get_http_client() + .post(Url::parse_with_params(GCM_SIGNALER_SRQE, [("key", LIVE_CHAT_BASE_TANGO_KEY)])?) + .header(header::CONTENT_TYPE, "application/json+protobuf") + .body(format!(r#"[[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]]]]"#, self.topic)) + .send() + .await? + .simd_json() + .await?; + let gsess = server_response.as_array().unwrap()[0].as_str().unwrap(); + self.gsessionid = Some(gsess.to_owned()); + Ok(()) + } - while let Some(c) = stream.chunk().await? { - println!("{}", String::from_utf8_lossy(&c)); + pub async fn renew_session_or_something(&mut self) -> Result<(), YouTubeError> { + let mut ofs_parameters = HashMap::new(); + ofs_parameters.insert("count", "2".to_string()); + ofs_parameters.insert("ofs", "1".to_string()); + ofs_parameters.insert("req0___data__", format!(r#"[[["{}",null,[]]]]"#, self.session_n)); + self.session_n += 1; + ofs_parameters.insert( + "req1___data__", + format!(r#"[[["{}",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#, self.session_n, self.topic) + ); + let ofs = get_http_client() + .post(Url::parse_with_params( + GCM_SIGNALER_PSUB, + [ + ("VER", "8"), + ("gsessionid", self.gsessionid.as_ref().unwrap()), + ("key", LIVE_CHAT_BASE_TANGO_KEY), + ("SID", self.sid.as_ref().unwrap()), + ("RID", &self.rid.to_string()), + ("AID", &self.aid.to_string()), + ("CVER", "22"), + ("zx", Self::gen_zx().as_ref()), + ("t", "1") + ] + )?) + .header("X-WebChannel-Content-Type", "application/json+protobuf") + .form(&ofs_parameters) + .send() + .await?; + + let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); + println!("{ofs_res_line}"); + let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; + let value = value.as_array().unwrap(); + // assert_eq!(value[0].as_usize().unwrap(), 1); + + Ok(()) } - // todo: how to distinguish normal closure from server shutdown (NS_BINDING_ABORTED)? - println!("{:?}", stream.bytes().await); + pub async fn init_session(&mut self) -> Result<(), YouTubeError> { + let mut ofs_parameters = HashMap::new(); + ofs_parameters.insert("count", "1".to_string()); + ofs_parameters.insert("ofs", "0".to_string()); + ofs_parameters.insert( + "req0___data__", + format!(r#"[[["1",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#, self.topic) + ); + self.session_n = 1; + let ofs = get_http_client() + .post(Url::parse_with_params( + GCM_SIGNALER_PSUB, + [ + ("VER", "8"), + ("gsessionid", self.gsessionid.as_ref().unwrap()), + ("key", LIVE_CHAT_BASE_TANGO_KEY), + ("RID", &self.rid.to_string()), + ("AID", &self.aid.to_string()), + ("CVER", "22"), + ("zx", Self::gen_zx().as_ref()), + ("t", "1") + ] + )?) + .header("X-WebChannel-Content-Type", "application/json+protobuf") + .form(&ofs_parameters) + .send() + .await?; + + let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); + let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; + let value = value.as_array().unwrap()[0].as_array().unwrap(); + assert_eq!(value[0].as_usize().unwrap(), 0); + let sid = value[1].as_array().unwrap()[1].as_str().unwrap(); + self.sid = Some(sid.to_owned()); + Ok(()) + } + + pub async fn get_session_stream(&self) -> Result { + Ok(get_http_client() + .get(Url::parse_with_params( + GCM_SIGNALER_PSUB, + [ + ("VER", "8"), + ("gsessionid", self.gsessionid.as_ref().unwrap()), + ("key", LIVE_CHAT_BASE_TANGO_KEY), + ("RID", "rpc"), + ("SID", self.sid.as_ref().unwrap()), + ("AID", &self.aid.to_string()), + ("CI", "0"), + ("TYPE", "xmlhttp"), + ("zx", &Self::gen_zx()), + ("t", "1") + ] + )?) + .header(header::CONNECTION, "keep-alive") + .send() + .await?) + } +} + +#[derive(Debug)] +pub struct SignalerChannel { + inner: Arc> +} - Ok(()) +impl SignalerChannel { + pub async fn new(topic_id: impl ToString) -> Result { + Ok(SignalerChannel { + inner: Arc::new(Mutex::new(SignalerChannelInner::with_topic(topic_id))) + }) + } + + pub async fn new_from_cont(cont: &GetLiveChatResponse) -> Result { + Ok(SignalerChannel { + inner: Arc::new(Mutex::new(SignalerChannelInner::with_topic( + &cont.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] + .invalidation_continuation_data + .as_ref() + .unwrap() + .invalidation_id + .topic + ))) + }) + } + + pub async fn spawn_event_subscriber(&self) -> Result<(broadcast::Receiver<()>, JoinHandle<()>), YouTubeError> { + let inner = Arc::clone(&self.inner); + { + let mut lock = inner.lock().await; + lock.choose_server().await?; + lock.init_session().await?; + } + let (sender, receiver) = broadcast::channel(128); + let handle = tokio::spawn(async move { + loop { + let mut req = { + let mut lock = inner.lock().await; + lock.reset(); + lock.choose_server().await.unwrap(); + lock.init_session().await.unwrap(); + lock.get_session_stream().await.unwrap() + }; + loop { + match req.chunk().await { + Ok(None) => break, + Ok(Some(s)) => { + let mut ofs_res_line = s.lines().nth(1).unwrap().unwrap(); + println!("{ofs_res_line}"); + if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } { + let a = s.as_array().unwrap(); + { + inner.lock().await.aid = a[a.len() - 1].as_array().unwrap()[0].as_usize().unwrap(); + } + } + } + Err(e) => { + eprintln!("{e:?}"); + break; + } + } + } + } + }); + Ok((receiver, handle)) + } }