Skip to content

Commit

Permalink
Upgrade pgwire to 0.28 (#2488)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov authored Jan 15, 2025
1 parent 9189e85 commit 19fc6f7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions crates/storage-query-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ workspace-hack = { version = "0.1", path = "../../workspace-hack" }

restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-partition-store = {workspace = true }
restate-storage-api = {workspace = true}
restate-storage-query-datafusion = {workspace = true }
restate-partition-store = { workspace = true }
restate-storage-api = { workspace = true }
restate-storage-query-datafusion = { workspace = true }
restate-types = { workspace = true }


anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
Expand All @@ -31,9 +30,9 @@ codederror = { workspace = true }
datafusion = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
paste = { workspace = true}
pgwire = {version = "0.25", default-features = false, features = ["server-api-ring"]}
prost = {workspace = true}
paste = { workspace = true }
pgwire = { version = "0.28.0", default-features = false, features = ["server-api-ring"] }
prost = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
thiserror = { workspace = true }
Expand Down
22 changes: 16 additions & 6 deletions crates/storage-query-postgres/src/pgwire_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,32 @@ use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;

use crate::extended_query::NoopExtendedQueryHandler;
use pgwire::api::auth::noop::NoopStartupHandler;
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::query::SimpleQueryHandler;
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response};
use pgwire::api::{ClientInfo, PgWireHandlerFactory, Type};
use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pgwire::messages::data::DataRow;
use pgwire::tokio::process_socket;

use crate::extended_query::NoopExtendedQueryHandler;
use restate_core::{TaskCenter, TaskKind};
use restate_storage_query_datafusion::context::QueryContext;

pub(crate) struct HandlerFactory {
processor: Arc<DfSessionService>,
placeholder: Arc<NoopExtendedQueryHandler>,
authenticator: Arc<NoopStartupHandler>,
authenticator: Arc<NoAuthHandler>,
copy_handler: Arc<NoopCopyHandler>,
}

impl PgWireHandlerFactory for HandlerFactory {
type StartupHandler = NoopStartupHandler;
impl PgWireServerHandlers for HandlerFactory {
type StartupHandler = NoAuthHandler;
type SimpleQueryHandler = DfSessionService;
type ExtendedQueryHandler = NoopExtendedQueryHandler;
type CopyHandler = NoopCopyHandler;
type ErrorHandler = NoopErrorHandler;

fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
self.processor.clone()
Expand All @@ -72,14 +74,22 @@ impl PgWireHandlerFactory for HandlerFactory {
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
self.copy_handler.clone()
}

fn error_handler(&self) -> Arc<Self::ErrorHandler> {
Arc::new(NoopErrorHandler)
}
}

pub(crate) struct NoAuthHandler {}

impl NoopStartupHandler for NoAuthHandler {}

impl HandlerFactory {
pub fn new(ctx: QueryContext) -> Self {
let processor = Arc::new(DfSessionService::new(ctx));
// We have not implemented extended query in this server, use placeholder instead
let placeholder = Arc::new(NoopExtendedQueryHandler::new());
let authenticator = Arc::new(NoopStartupHandler);
let authenticator = Arc::new(NoAuthHandler {});
let copy_handler = Arc::new(NoopCopyHandler);

Self {
Expand Down

0 comments on commit 19fc6f7

Please sign in to comment.