Skip to content

Commit

Permalink
Configuration to disable internal stmt cache
Browse files Browse the repository at this point in the history
This enables usage with pgBouncer's transaction mode. Typically when
using the transaction mode, a client gets a new connection from
pgBouncer for every new transaction. It's quite useful and allows one to
use prepared statements in this mode. The workflow goes:

```sql
-- start a new transaction
BEGIN
-- deallocate all stored statements from the server to prevent
-- collisions
DEALLOCATE ALL
-- run the queries here
-- ..
-- ..
COMMIT -- or ROLLBACK
```

Now in a case where the query uses custom types such as enums, what
tokio-postgres does is it fetches the type info for the given type,
stores the info to the cache and also caches the statements for
fetching the info to the client. Now when we have two tables with
different custom types in both of them, we can imagine the following
workflow:

```rust
// first query
client.simple_query("BEGIN")?;
client.simple_query("DEALLOCATE ALL")?;
let stmt = client.prepare("SELECT \"public\".\"User\".\"id\", \"public\".\"User\".\"userType\" FROM \"public\".\"User\" WHERE 1=1 OFFSET $1")?;
dbg!(client.query(&stmt, &[&0i64])?);
client.simple_query("COMMIT")?;

// second query
client.simple_query("BEGIN")?;
client.simple_query("DEALLOCATE ALL")?;
let stmt = client.prepare("SELECT \"public\".\"Work\".\"id\", \"public\".\"Work\".\"workType\" FROM \"public\".\"Work\" WHERE 1=1 OFFSET $1")?;
dbg!(client.query(&stmt, &[&0i64])?);
client.simple_query("COMMIT")?;
```

The `userType` and `workType` are both enums, and the preparing of the
second query will give an error `prepared statement "s1" does not
exist`, where `s1` is the query to the `pg_catalog` for the type info.

The change here gives an extra flag for the client to disable caching
of statements.
  • Loading branch information
Julius de Bruijn committed Jul 22, 2020
1 parent f6620e6 commit 87316f0
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 7 deletions.
15 changes: 15 additions & 0 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,21 @@ impl Config {
self.config.get_channel_binding()
}

/// When enabled, the client skips all internal caching for statements,
/// allowing usage with pgBouncer's transaction mode and clearing of
/// statements in the session with `DEALLOCATE ALL`.
///
/// Defaults to `false`.
pub fn pgbouncer_mode(&mut self, enable: bool) -> &mut Config {
self.config.pgbouncer_mode(enable);
self
}

/// Gets the pgBouncer mode status.
pub fn get_pgbouncer_mode(&self) -> bool {
self.config.get_pgbouncer_mode()
}

/// Opens a connection to a PostgreSQL database.
pub fn connect<T>(&self, tls: T) -> Result<Client, Error>
where
Expand Down
33 changes: 27 additions & 6 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct State {
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
state: Mutex<State>,
pgbouncer_mode: bool,
}

impl InnerClient {
Expand All @@ -81,27 +82,45 @@ impl InnerClient {
}

pub fn typeinfo(&self) -> Option<Statement> {
self.state.lock().typeinfo.clone()
if self.pgbouncer_mode {
None
} else {
self.state.lock().typeinfo.clone()
}
}

pub fn set_typeinfo(&self, statement: &Statement) {
self.state.lock().typeinfo = Some(statement.clone());
if !self.pgbouncer_mode {
self.state.lock().typeinfo = Some(statement.clone());
}
}

pub fn typeinfo_composite(&self) -> Option<Statement> {
self.state.lock().typeinfo_composite.clone()
if self.pgbouncer_mode {
None
} else {
self.state.lock().typeinfo_composite.clone()
}
}

pub fn set_typeinfo_composite(&self, statement: &Statement) {
self.state.lock().typeinfo_composite = Some(statement.clone());
if !self.pgbouncer_mode {
self.state.lock().typeinfo_composite = Some(statement.clone());
}
}

pub fn typeinfo_enum(&self) -> Option<Statement> {
self.state.lock().typeinfo_enum.clone()
if self.pgbouncer_mode {
None
} else {
self.state.lock().typeinfo_enum.clone()
}
}

pub fn set_typeinfo_enum(&self, statement: &Statement) {
self.state.lock().typeinfo_enum = Some(statement.clone());
if !self.pgbouncer_mode {
self.state.lock().typeinfo_enum = Some(statement.clone());
}
}

pub fn type_(&self, oid: Oid) -> Option<Type> {
Expand Down Expand Up @@ -151,6 +170,7 @@ impl Client {
ssl_mode: SslMode,
process_id: i32,
secret_key: i32,
pgbouncer_mode: bool,
) -> Client {
Client {
inner: Arc::new(InnerClient {
Expand All @@ -162,6 +182,7 @@ impl Client {
types: HashMap::new(),
buf: BytesMut::new(),
}),
pgbouncer_mode,
}),
#[cfg(feature = "runtime")]
socket_config: None,
Expand Down
17 changes: 17 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub struct Config {
pub(crate) keepalives_idle: Duration,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
pub(crate) pgbouncer_mode: bool,
}

impl Default for Config {
Expand All @@ -184,6 +185,7 @@ impl Config {
keepalives_idle: Duration::from_secs(2 * 60 * 60),
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
pgbouncer_mode: false,
}
}

Expand Down Expand Up @@ -387,6 +389,21 @@ impl Config {
self.channel_binding
}

/// When enabled, the client skips all internal caching for statements,
/// allowing usage with pgBouncer's transaction mode and clearing of
/// statements in the session with `DEALLOCATE ALL`.
///
/// Defaults to `false`.
pub fn pgbouncer_mode(&mut self, enable: bool) -> &mut Config {
self.pgbouncer_mode = enable;
self
}

/// Gets the pgBouncer mode status.
pub fn get_pgbouncer_mode(&self) -> bool {
self.pgbouncer_mode
}

fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key {
"user" => {
Expand Down
8 changes: 7 additions & 1 deletion tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,13 @@ where
let (process_id, secret_key, parameters) = read_info(&mut stream).await?;

let (sender, receiver) = mpsc::unbounded();
let client = Client::new(sender, config.ssl_mode, process_id, secret_key);
let client = Client::new(
sender,
config.ssl_mode,
process_id,
secret_key,
config.pgbouncer_mode,
);
let connection = Connection::new(stream.inner, stream.delayed, parameters, receiver);

Ok((client, connection))
Expand Down

0 comments on commit 87316f0

Please sign in to comment.