Skip to content

Commit

Permalink
Fix bugs in pipethrough and get_post_thread related to proxying
Browse files Browse the repository at this point in the history
  • Loading branch information
rudyfraser committed Sep 17, 2024
1 parent cbe1728 commit f43ffe7
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 142 deletions.
101 changes: 72 additions & 29 deletions rsky-pds/src/apis/app/bsky/feed/get_post_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,31 @@ pub async fn inner_get_post_thread(
headers,
} = xrpc
{
let actor_store = ActorStore::new(
requester.clone(),
S3BlobStore::new(requester.clone(), s3_config),
);
let local_viewer_lock = state_local_viewer.local_viewer.read().await;
let local_viewer = local_viewer_lock(actor_store);
let local = read_after_write_not_found(
local_viewer,
uri,
parentHeight,
requester,
Some(headers.clone()),
cfg,
)
.await?;
match local {
None => Err(err),
Some(local) => Ok(ReadAfterWriteResponse::HandlerResponse(
format_munged_response(local.data, local.lag)?,
)),
match error {
Some(error) if error == "NotFound" => {
let actor_store = ActorStore::new(
requester.clone(),
S3BlobStore::new(requester.clone(), s3_config),
);
let local_viewer_lock = state_local_viewer.local_viewer.read().await;
let local_viewer = local_viewer_lock(actor_store);
let local = read_after_write_not_found(
local_viewer,
uri,
parentHeight,
requester,
Some(headers.clone()),
cfg,
)
.await?;
match local {
None => Err(err),
Some(local) => Ok(ReadAfterWriteResponse::HandlerResponse(
format_munged_response(local.data, local.lag)?,
)),
}
}
_ => Err(err),
}
} else {
return Err(err);
Expand Down Expand Up @@ -156,15 +161,53 @@ pub async fn get_post_thread(
.await
{
Ok(response) => Ok(response),
Err(error) => {
let internal_error = ErrorMessageResponse {
code: Some(ErrorCode::InternalServerError),
message: Some(error.to_string()),
};
return Err(status::Custom(
Status::InternalServerError,
Json(internal_error),
));
Err(err) => {
return match err.downcast_ref() {
Some(InvalidRequestError::XRPCError(xrpc)) => {
if let XRPCError::FailedResponse {
status,
error,
message,
headers,
} = xrpc
{
let xrpc_error = ErrorMessageResponse {
code: match error {
None => None,
Some(error) => Some(
ErrorCode::from_str(error)
.unwrap_or(ErrorCode::InternalServerError),
),
},
message: match message {
None => None,
Some(message) => Some(message.to_string()),
},
};
Err(status::Custom(Status::BadRequest, Json(xrpc_error)))
} else {
let internal_error = ErrorMessageResponse {
code: Some(ErrorCode::InternalServerError),
message: Some(err.to_string()),
};
Err(status::Custom(
Status::InternalServerError,
Json(internal_error),
))
}
}
_ => {
eprintln!("@LOG: ERROR: {err}");
let internal_error = ErrorMessageResponse {
code: Some(ErrorCode::InternalServerError),
message: Some(err.to_string()),
};
Err(status::Custom(
Status::InternalServerError,
Json(internal_error),
))
}
}
}
},
}
Expand Down
5 changes: 1 addition & 4 deletions rsky-pds/src/apis/com/atproto/repo/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ pub async fn get_record(
code: Some(ErrorCode::NotFound),
message: Some(error.to_string()),
};
return Err(status::Custom(
Status::NotFound,
Json(not_found),
));
return Err(status::Custom(Status::NotFound, Json(not_found)));
}
}
}
197 changes: 103 additions & 94 deletions rsky-pds/src/models/error_code.rs
Original file line number Diff line number Diff line change
@@ -1,153 +1,162 @@
use anyhow::{bail, Result};
use std::fmt::Display;

#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum ErrorCode {
#[serde(rename = "multiple_choices")]
MultipleChoices,
#[serde(rename = "moved_permanently")]
MovedPermanently,
#[serde(rename = "found")]
Found,
#[serde(rename = "see_other")]
SeeOther,
#[serde(rename = "not_modified")]
NotModified,
#[serde(rename = "use_proxy")]
UseProxy,
#[serde(rename = "temporary_redirect")]
TemporaryRedirect,
#[serde(rename = "permanent_redirect")]
PermanentRedirect,
#[serde(rename = "bad_request")]
BadRequest,
#[serde(rename = "unauthorized")]
Unauthorized,
#[serde(rename = "payment_required")]
PaymentRequired,
#[serde(rename = "forbidden")]
Forbidden,
#[serde(rename = "not_found")]
NotFound,
#[serde(rename = "method_not_allowed")]
MethodNotAllowed,
#[serde(rename = "not_acceptable")]
NotAcceptable,
#[serde(rename = "proxy_authentication_required")]
ProxyAuthenticationRequired,
#[serde(rename = "request_timeout")]
RequestTimeout,
#[serde(rename = "conflict")]
Conflict,
#[serde(rename = "gone")]
Gone,
#[serde(rename = "length_required")]
LengthRequired,
#[serde(rename = "precondition_failed")]
PreconditionFailed,
#[serde(rename = "payload_too_large")]
PayloadTooLarge,
#[serde(rename = "uri_too_long")]
UriTooLong,
#[serde(rename = "unsupported_media_type")]
UnsupportedMediaType,
#[serde(rename = "range_not_satisfiable")]
RangeNotSatisfiable,
#[serde(rename = "expectation_failed")]
ExpectationFailed,
#[serde(rename = "im_a_teapot")]
ImATeapot,
#[serde(rename = "misdirected_request")]
MisdirectedRequest,
#[serde(rename = "unprocessable_entity")]
UnprocessableEntity,
#[serde(rename = "locked")]
Locked,
#[serde(rename = "failed_dependency")]
FailedDependency,
#[serde(rename = "upgrade_required")]
UpgradeRequired,
#[serde(rename = "precondition_required")]
PreconditionRequired,
#[serde(rename = "too_many_requests")]
TooManyRequests,
#[serde(rename = "request_header_fields_too_large")]
RequestHeaderFieldsTooLarge,
#[serde(rename = "unavailable_for_legal_reasons")]
UnavailableForLegalReasons,
#[serde(rename = "internal_server_error")]
InternalServerError,
#[serde(rename = "not_implemented")]
NotImplemented,
#[serde(rename = "bad_gateway")]
BadGateway,
#[serde(rename = "service_unavailable")]
ServiceUnavailable,
#[serde(rename = "gateway_timeout")]
GatewayTimeout,
#[serde(rename = "http_version_not_supported")]
HttpVersionNotSupported,
#[serde(rename = "variant_also_negotiates")]
VariantAlsoNegotiates,
#[serde(rename = "insufficient_storage")]
InsufficientStorage,
#[serde(rename = "loop_detected")]
LoopDetected,
#[serde(rename = "not_extended")]
NotExtended,
#[serde(rename = "network_authentication_required")]
NetworkAuthenticationRequired,
}

impl ErrorCode {
pub fn from_str(code: &str) -> Result<Self> {
match code {
"MultipleChoices" => Ok(Self::MultipleChoices),
"MovedPermanently" => Ok(Self::MovedPermanently),
"Found" => Ok(Self::Found),
"SeeOther" => Ok(Self::SeeOther),
"NotModified" => Ok(Self::NotModified),
"UseProxy" => Ok(Self::UseProxy),
"TemporaryRedirect" => Ok(Self::TemporaryRedirect),
"PermanentRedirect" => Ok(Self::PermanentRedirect),
"BadRequest" => Ok(Self::BadRequest),
"Unauthorized" => Ok(Self::Unauthorized),
"PaymentRequired" => Ok(Self::PaymentRequired),
"Forbidden" => Ok(Self::Forbidden),
"NotFound" => Ok(Self::NotFound),
"MethodNotAllowed" => Ok(Self::MethodNotAllowed),
"NotAcceptable" => Ok(Self::NotAcceptable),
"ProxyAuthenticationRequired" => Ok(Self::ProxyAuthenticationRequired),
"RequestTimeout" => Ok(Self::RequestTimeout),
"Conflict" => Ok(Self::Conflict),
"Gone" => Ok(Self::Gone),
"LengthRequired" => Ok(Self::LengthRequired),
"PreconditionFailed" => Ok(Self::PreconditionFailed),
"PayloadTooLarge" => Ok(Self::PayloadTooLarge),
"UriTooLong" => Ok(Self::UriTooLong),
"UnsupportedMediaType" => Ok(Self::UnsupportedMediaType),
"RangeNotSatisfiable" => Ok(Self::RangeNotSatisfiable),
"ExpectationFailed" => Ok(Self::ExpectationFailed),
"ImATeapot" => Ok(Self::ImATeapot),
"MisdirectedRequest" => Ok(Self::MisdirectedRequest),
"UnprocessableEntity" => Ok(Self::UnprocessableEntity),
"Locked" => Ok(Self::Locked),
"FailedDependency" => Ok(Self::FailedDependency),
"UpgradeRequired" => Ok(Self::UpgradeRequired),
"PreconditionRequired" => Ok(Self::PreconditionRequired),
"TooManyRequests" => Ok(Self::TooManyRequests),
"RequestHeaderFieldsTooLarge" => Ok(Self::RequestHeaderFieldsTooLarge),
"UnavailableForLegalReasons" => Ok(Self::UnavailableForLegalReasons),
"InternalServerError" => Ok(Self::InternalServerError),
"NotImplemented" => Ok(Self::NotImplemented),
"BadGateway" => Ok(Self::BadGateway),
"ServiceUnavailable" => Ok(Self::ServiceUnavailable),
"GatewayTimeout" => Ok(Self::GatewayTimeout),
"HttpVersionNotSupported" => Ok(Self::HttpVersionNotSupported),
"VariantAlsoNegotiates" => Ok(Self::VariantAlsoNegotiates),
"InsufficientStorage" => Ok(Self::InsufficientStorage),
"LoopDetected" => Ok(Self::LoopDetected),
"NotExtended" => Ok(Self::NotExtended),
"NetworkAuthenticationRequired" => Ok(Self::NetworkAuthenticationRequired),
_ => bail!("Invalid ErrorCode: `{code:?}` is not a valid error code"),
}
}
}

impl Display for ErrorCode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
Self::MultipleChoices => String::from("multiple_choices"),
Self::MovedPermanently => String::from("moved_permanently"),
Self::Found => String::from("found"),
Self::SeeOther => String::from("see_other"),
Self::NotModified => String::from("not_modified"),
Self::UseProxy => String::from("use_proxy"),
Self::TemporaryRedirect => String::from("temporary_redirect"),
Self::PermanentRedirect => String::from("permanent_redirect"),
Self::BadRequest => String::from("bad_request"),
Self::Unauthorized => String::from("unauthorized"),
Self::PaymentRequired => String::from("payment_required"),
Self::Forbidden => String::from("forbidden"),
Self::NotFound => String::from("not_found"),
Self::MethodNotAllowed => String::from("method_not_allowed"),
Self::NotAcceptable => String::from("not_acceptable"),
Self::ProxyAuthenticationRequired => String::from("proxy_authentication_required"),
Self::RequestTimeout => String::from("request_timeout"),
Self::Conflict => String::from("conflict"),
Self::Gone => String::from("gone"),
Self::LengthRequired => String::from("length_required"),
Self::PreconditionFailed => String::from("precondition_failed"),
Self::PayloadTooLarge => String::from("payload_too_large"),
Self::UriTooLong => String::from("uri_too_long"),
Self::UnsupportedMediaType => String::from("unsupported_media_type"),
Self::RangeNotSatisfiable => String::from("range_not_satisfiable"),
Self::ExpectationFailed => String::from("expectation_failed"),
Self::ImATeapot => String::from("im_a_teapot"),
Self::MisdirectedRequest => String::from("misdirected_request"),
Self::UnprocessableEntity => String::from("unprocessable_entity"),
Self::Locked => String::from("locked"),
Self::FailedDependency => String::from("failed_dependency"),
Self::UpgradeRequired => String::from("upgrade_required"),
Self::PreconditionRequired => String::from("precondition_required"),
Self::TooManyRequests => String::from("too_many_requests"),
Self::RequestHeaderFieldsTooLarge => String::from("request_header_fields_too_large"),
Self::UnavailableForLegalReasons => String::from("unavailable_for_legal_reasons"),
Self::InternalServerError => String::from("internal_server_error"),
Self::NotImplemented => String::from("not_implemented"),
Self::BadGateway => String::from("bad_gateway"),
Self::ServiceUnavailable => String::from("service_unavailable"),
Self::GatewayTimeout => String::from("gateway_timeout"),
Self::HttpVersionNotSupported => String::from("http_version_not_supported"),
Self::VariantAlsoNegotiates => String::from("variant_also_negotiates"),
Self::InsufficientStorage => String::from("insufficient_storage"),
Self::LoopDetected => String::from("loop_detected"),
Self::NotExtended => String::from("not_extended"),
Self::NetworkAuthenticationRequired => String::from("network_authentication_required"),
Self::MultipleChoices => String::from("MultipleChoices"),
Self::MovedPermanently => String::from("MovedPermanently"),
Self::Found => String::from("Found"),
Self::SeeOther => String::from("SeeOther"),
Self::NotModified => String::from("NotModified"),
Self::UseProxy => String::from("UseProxy"),
Self::TemporaryRedirect => String::from("TemporaryRedirect"),
Self::PermanentRedirect => String::from("PermanentRedirect"),
Self::BadRequest => String::from("BadRequest"),
Self::Unauthorized => String::from("Unauthorized"),
Self::PaymentRequired => String::from("PaymentRequired"),
Self::Forbidden => String::from("Forbidden"),
Self::NotFound => String::from("NotFound"),
Self::MethodNotAllowed => String::from("MethodNotAllowed"),
Self::NotAcceptable => String::from("NotAcceptable"),
Self::ProxyAuthenticationRequired => String::from("ProxyAuthenticationRequired"),
Self::RequestTimeout => String::from("RequestTimeout"),
Self::Conflict => String::from("Conflict"),
Self::Gone => String::from("Gone"),
Self::LengthRequired => String::from("LengthRequired"),
Self::PreconditionFailed => String::from("PreconditionFailed"),
Self::PayloadTooLarge => String::from("PayloadTooLarge"),
Self::UriTooLong => String::from("UriTooLong"),
Self::UnsupportedMediaType => String::from("UnsupportedMediaType"),
Self::RangeNotSatisfiable => String::from("RangeNotSatisfiable"),
Self::ExpectationFailed => String::from("ExpectationFailed"),
Self::ImATeapot => String::from("ImATeapot"),
Self::MisdirectedRequest => String::from("MisdirectedRequest"),
Self::UnprocessableEntity => String::from("UnprocessableEntity"),
Self::Locked => String::from("Locked"),
Self::FailedDependency => String::from("FailedDependency"),
Self::UpgradeRequired => String::from("UpgradeRequired"),
Self::PreconditionRequired => String::from("PreconditionRequired"),
Self::TooManyRequests => String::from("TooManyRequests"),
Self::RequestHeaderFieldsTooLarge => String::from("RequestHeaderFieldsTooLarge"),
Self::UnavailableForLegalReasons => String::from("UnavailableForLegalReasons"),
Self::InternalServerError => String::from("InternalServerError"),
Self::NotImplemented => String::from("NotImplemented"),
Self::BadGateway => String::from("BadGateway"),
Self::ServiceUnavailable => String::from("ServiceUnavailable"),
Self::GatewayTimeout => String::from("GatewayTimeout"),
Self::HttpVersionNotSupported => String::from("HttpVersionNotSupported"),
Self::VariantAlsoNegotiates => String::from("VariantAlsoNegotiates"),
Self::InsufficientStorage => String::from("InsufficientStorage"),
Self::LoopDetected => String::from("LoopDetected"),
Self::NotExtended => String::from("NotExtended"),
Self::NetworkAuthenticationRequired => String::from("NetworkAuthenticationRequired"),
};
write!(f, "{}", str)
}
Expand Down
Loading

0 comments on commit f43ffe7

Please sign in to comment.