Skip to content

Commit

Permalink
cleanup a little
Browse files Browse the repository at this point in the history
  • Loading branch information
decahedron1 committed Jan 27, 2024
1 parent ab95185 commit ea9a4f6
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 43 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde-aux = { version = "4.4", optional = true }
uuid = { version = "1.5", optional = true }
reqwest = { version = "0.11", optional = true }
simd-json = { version = "0.13", optional = true }
url = { version = "2.5", optional = true }
regex = { version = "1.10", optional = true }

[dev-dependencies]
Expand All @@ -27,7 +28,7 @@ tokio = { version = "1", features = [ "rt", "rt-multi-thread", "macros", "net" ]
[features]
default = [ "tls-native", "twitch", "youtube" ]
twitch = [ "dep:irc", "dep:uuid" ]
youtube = [ "dep:simd-json", "dep:reqwest", "dep:serde", "dep:regex", "dep:serde-aux" ]
youtube = [ "dep:simd-json", "dep:reqwest", "dep:serde", "dep:url", "dep:regex", "dep:serde-aux" ]
serde = [ "dep:serde", "chrono/serde", "uuid?/serde" ]
tls-native = [ "irc?/tls-native" ]
#tls-rust = [ "irc?/tls-rust" ]
2 changes: 1 addition & 1 deletion examples/youtube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures_util::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (options, cont) = youtube::get_options_from_live_page("e-5D_Shoozk").await?;
let (options, cont) = youtube::get_options_from_live_page("5Z5Sys8-tLs").await?;
let initial_chat = youtube::fetch_yt_chat_page(&options, cont).await?;
youtube::subscribe_to_events(&options, &initial_chat).await?;
Ok(())
Expand Down
98 changes: 57 additions & 41 deletions src/youtube/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::{
collections::{HashMap, VecDeque},
io::BufRead,
sync::OnceLock,
time::{Instant, SystemTime, UNIX_EPOCH}
sync::OnceLock
};

use regex::Regex;
use reqwest::{
get,
header::{self, HeaderMap, HeaderValue, CONTENT_TYPE},
header::{self, HeaderMap, HeaderValue},
StatusCode
};
use simd_json::{
Expand All @@ -17,6 +15,7 @@ use simd_json::{
};
use thiserror::Error;
use tokio::sync::Mutex;
use url::Url;

mod types;
mod util;
Expand All @@ -27,11 +26,12 @@ use self::{

const GCM_SIGNALER_SRQE: &str = "https://signaler-pa.youtube.com/punctual/v1/chooseServer";
const GCM_SIGNALER_PSUB: &str = "https://signaler-pa.youtube.com/punctual/multi-watch/channel";
const TANGO_LIVE_ENDPOINT: &str = "https://www.youtube.com/youtubei/v1/live_chat/get_live_chat";
const TANGO_REPLAY_ENDPOINT: &str = "https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay";

const LIVE_CHAT_BASE_TANGO_KEY: &str = "AIzaSyDZNkyC-AtROwMBpLfevIvqYk-Gfi8ZOeo";

static USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0";
static HTTP_CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0";

#[derive(Debug, Error)]
pub enum YouTubeError {
Expand All @@ -54,7 +54,9 @@ pub enum YouTubeError {
#[error("Failed to match InnerTube API key")]
NoInnerTubeKey,
#[error("Chat continuation token could not be found.")]
NoChatContinuation
NoChatContinuation,
#[error("Error parsing URL: {0}")]
URLParseError(#[from] url::ParseError)
}

impl From<reqwest::Error> for YouTubeError {
Expand All @@ -70,11 +72,13 @@ impl From<reqwest::Error> for YouTubeError {
}

pub(crate) fn get_http_client() -> &'static reqwest::Client {
static HTTP_CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
HTTP_CLIENT.get_or_init(|| {
let mut headers = HeaderMap::new();
// Set our Accept-Language to en-US so we can properly match substrings
headers.append(header::ACCEPT_LANGUAGE, HeaderValue::from_static("en-US,en;q=0.5"));
headers.append(header::USER_AGENT, HeaderValue::from_static(USER_AGENT));
headers.append(header::REFERER, HeaderValue::from_static("https://www.youtube.com/"));
reqwest::Client::builder().default_headers(headers).build().unwrap()
})
}
Expand Down Expand Up @@ -154,10 +158,6 @@ pub struct YouTubeChatPageProcessor<'r> {
continuation_token: Option<String>
}

#[derive(Debug, Error)]
#[error("no continuation available")]
pub struct NoContinuationError;

unsafe impl<'r> Send for YouTubeChatPageProcessor<'r> {}

impl<'r> YouTubeChatPageProcessor<'r> {
Expand Down Expand Up @@ -240,11 +240,17 @@ impl<'r> Iterator for &YouTubeChatPageProcessor<'r> {
}

pub async fn fetch_yt_chat_page(options: &RequestOptions, continuation: impl AsRef<str>) -> Result<GetLiveChatResponse, YouTubeError> {
let url =
format!("https://www.youtube.com/youtubei/v1/live_chat/get_live_chat{}?key={}", if !options.live_status { "_replay" } else { "" }, &options.api_key);
let body = GetLiveChatBody::new(continuation.as_ref(), &options.client_version, "WEB");
let response = get_http_client().post(url).simd_json(&body)?.send().await?;
let response: GetLiveChatResponse = response.simd_json().await?;
let response: GetLiveChatResponse = get_http_client()
.post(Url::parse_with_params(
if options.live_status { TANGO_LIVE_ENDPOINT } else { TANGO_REPLAY_ENDPOINT },
[("key", options.api_key.as_str()), ("prettyPrint", "false")]
)?)
.simd_json(&body)?
.send()
.await?
.simd_json()
.await?;
Ok(response)
}

Expand All @@ -262,13 +268,9 @@ pub async fn subscribe_to_events(options: &RequestOptions, initial_continuation:
.topic;

let server_response: OwnedValue = get_http_client()
.post(format!("{GCM_SIGNALER_SRQE}?key={}", LIVE_CHAT_BASE_TANGO_KEY))
.post(Url::parse_with_params(GCM_SIGNALER_SRQE, [("key", LIVE_CHAT_BASE_TANGO_KEY)])?)
.header(header::CONTENT_TYPE, "application/json+protobuf")
.header(header::REFERER, "https://www.youtube.com/")
.header("Sec-Fetch-Site", "same-site")
.header(header::ORIGIN, "https://www.youtube.com/")
.header(header::ACCEPT_ENCODING, "gzip, deflate, br")
.simd_json(&simd_json::json!([[null, null, null, [7, 5], null, [["youtube_live_chat_web"], [1], [[[&topic_id]]]]]]))?
.body(format!(r#"[[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{topic_id}"]]]]]]"#))
.send()
.await?
.simd_json()
Expand All @@ -278,39 +280,50 @@ pub async fn subscribe_to_events(options: &RequestOptions, initial_continuation:
let mut ofs_parameters = HashMap::new();
ofs_parameters.insert("count", "1".to_string());
ofs_parameters.insert("ofs", "0".to_string());
ofs_parameters.insert(
"req0___data__",
format!(
r#"[[["1",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#,
&topic_id
)
);
ofs_parameters
.insert("req0___data__", format!(r#"[[["1",[null,null,null,[7,5],null,[["youtube_live_chat_web"],[1],[[["{topic_id}"]]]],null,null,1],null,3]]]"#));
let ofs = get_http_client()
.post(format!("{GCM_SIGNALER_PSUB}?VER=8&gsessionid={gsess}&key={LIVE_CHAT_BASE_TANGO_KEY}&RID=60464&CVER=22&zx=uo5vp9j380ef&t=1"))
.header(header::REFERER, "https://www.youtube.com/")
.header("Sec-Fetch-Site", "same-site")
.header(header::ORIGIN, "https://www.youtube.com/")
.header(header::ACCEPT_ENCODING, "gzip, deflate, br")
.post(Url::parse_with_params(
GCM_SIGNALER_PSUB,
[
("VER", "8"),
("gsessionid", &gsess),
("key", LIVE_CHAT_BASE_TANGO_KEY),
("RID", "60464"),
("CVER", "22"),
("zx", "uo5vp9j380ef"),
("t", "1")
]
)?)
.header("X-WebChannel-Content-Type", "application/json+protobuf")
.form(&ofs_parameters)
.send()
.await?;

// standard response: [[0,["c","koBtCISzwmqJpalH1EqHSc","",8,12,30000]]]
// stream end response: [1,3,7]
let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap();
let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?;
let value = value.as_array().unwrap()[0].as_array().unwrap();
assert_eq!(value[0].as_usize().unwrap(), 0);
let sid = value[1].as_array().unwrap()[1].as_str().unwrap();

let mut stream = get_http_client()
.get(format!(
"{GCM_SIGNALER_PSUB}?VER=8&gsessionid={gsess}&key={LIVE_CHAT_BASE_TANGO_KEY}&RID=rpc&SID={sid}&AID=0&CI=0&TYPE=xmlhttp&zx=uo5vp9j380ed&t=1"
))
.header(header::REFERER, "https://www.youtube.com/")
.header("Sec-Fetch-Site", "same-site")
.header(header::ORIGIN, "https://www.youtube.com/")
.header(header::ACCEPT_ENCODING, "gzip, deflate, br")
.header(header::ACCEPT, "*/*")
.get(Url::parse_with_params(
GCM_SIGNALER_PSUB,
[
("VER", "8"),
("gsessionid", &gsess),
("key", LIVE_CHAT_BASE_TANGO_KEY),
("RID", "rpc"),
("SID", sid),
("AID", "0"),
("CI", "0"),
("TYPE", "xmlhttp"),
("zx", "uo5vp9j380ef"),
("t", "1")
]
)?)
.header(header::CONNECTION, "keep-alive")
.send()
.await?;
Expand All @@ -319,5 +332,8 @@ pub async fn subscribe_to_events(options: &RequestOptions, initial_continuation:
println!("{}", String::from_utf8_lossy(&c));
}

// todo: how to distinguish normal closure from server shutdown (NS_BINDING_ABORTED)?
println!("{:?}", stream.bytes().await);

Ok(())
}

0 comments on commit ea9a4f6

Please sign in to comment.