diff --git "a/source/_posts/20240617a_\347\217\240\347\216\211\343\201\256\343\202\242\343\203\211\343\203\231\343\203\263\343\203\210\343\202\253\343\203\254\343\203\263\343\203\200\343\203\274\350\250\230\344\272\213\343\202\222\343\203\252\343\203\220\343\202\244\343\203\220\343\203\253\345\205\254\351\226\213\343\201\227\343\201\276\343\201\231.md" "b/source/_posts/20240617a_\347\217\240\347\216\211\343\201\256\343\202\242\343\203\211\343\203\231\343\203\263\343\203\210\343\202\253\343\203\254\343\203\263\343\203\200\343\203\274\350\250\230\344\272\213\343\202\222\343\203\252\343\203\220\343\202\244\343\203\220\343\203\253\345\205\254\351\226\213\343\201\227\343\201\276\343\201\231.md" index 80fee13f40c2..dee1781d5c09 100644 --- "a/source/_posts/20240617a_\347\217\240\347\216\211\343\201\256\343\202\242\343\203\211\343\203\231\343\203\263\343\203\210\343\202\253\343\203\254\343\203\263\343\203\200\343\203\274\350\250\230\344\272\213\343\202\222\343\203\252\343\203\220\343\202\244\343\203\220\343\203\253\345\205\254\351\226\213\343\201\227\343\201\276\343\201\231.md" +++ "b/source/_posts/20240617a_\347\217\240\347\216\211\343\201\256\343\202\242\343\203\211\343\203\231\343\203\263\343\203\210\343\202\253\343\203\254\343\203\263\343\203\200\343\203\274\350\250\230\344\272\213\343\202\222\343\203\252\343\203\220\343\202\244\343\203\220\343\203\253\345\205\254\351\226\213\343\201\227\343\201\276\343\201\231.md" @@ -45,8 +45,8 @@ TIG 真野です。 | 6/24 月| 枇榔晃裕 | [package.json dependencies メンテの仕方 最短ルート](/articles/20240624a/) | | 6/25 火| 塚本祥太 | [アルゴリズムで実社会を捉える~評価関数の作り方~](/articles/20240625a/) | | 6/26 水| 辻大志郎 | [社会人からはじめる競技プログラミング](/articles/20240626b/) | -| 6/27 木| (仮)星賢一 | [エンタープライズJavaで使えるORM「uroboroSQL」まとめ](/articles/20240627a/) | -| 6/28 金| (仮)柳原光佑 | PostgreSQLのPub/Sub機能とJavaのクライアント実装 | +| 6/27 木| 星賢一 | [エンタープライズJavaで使えるORM「uroboroSQL」まとめ](/articles/20240627a/) | +| 6/28 金| 柳原光佑 | [PostgreSQLのPub/Sub機能とJavaのクライアント実装](/articles/20240628a/) | ※星さん、柳原さんの記事は、時代に合わせてかなりアップデートしないと厳しいということで、間にあえば公開する予定です。 diff --git "a/source/_posts/20240628a_PostgreSQL\343\201\256Pub\357\274\217Sub\346\251\237\350\203\275\343\201\250Java\343\201\256\343\202\257\343\203\251\343\202\244\343\202\242\343\203\263\343\203\210\345\256\237\350\243\205.md" "b/source/_posts/20240628a_PostgreSQL\343\201\256Pub\357\274\217Sub\346\251\237\350\203\275\343\201\250Java\343\201\256\343\202\257\343\203\251\343\202\244\343\202\242\343\203\263\343\203\210\345\256\237\350\243\205.md" new file mode 100644 index 000000000000..12beda754789 --- /dev/null +++ "b/source/_posts/20240628a_PostgreSQL\343\201\256Pub\357\274\217Sub\346\251\237\350\203\275\343\201\250Java\343\201\256\343\202\257\343\203\251\343\202\244\343\202\242\343\203\263\343\203\210\345\256\237\350\243\205.md" @@ -0,0 +1,317 @@ +--- +title: "PostgreSQLのPub/Sub機能とJavaのクライアント実装" +date: 2024/06/28 00:00:00 +postid: a +tag: + - PostgreSQL + - R2DBC + - JDBC + - Java +category: + - Programming +thumbnail: /images/20240628a/thumbnail.png +author: 柳原光佑 +lede: "Pub/Sub型のメッセージングアーキテクチャを採用するにあたっては、kafkaなどのブローカーミドルウェアや、Amazon SNS、Google Cloud Pub/Subなどのマネージドサービスを利用するケースが多いかと思います。ところでPostgreSQLでも実はPub/Subができます。すでに業務でPostgreSQLを使っていれば、新たにPub/Subブローカーを構築しなくても、疎結合なシステム間通信を簡易的に実現できます。" +--- + +本記事は[「珠玉のアドベントカレンダー記事をリバイバル公開します」](/articles/20240617a/)企画のために、[以前Qiitaに投稿した記事](https://qiita.com/ksky/items/8933348de5af00e45ffe)を改訂したものです。 + +## はじめに + +Pub/Sub型のメッセージングアーキテクチャを採用するにあたっては、kafkaなどのブローカーミドルウェアや、Amazon SNS、Google Cloud Pub/Subなどのマネージドサービスを利用するケースが多いかと思います。ところでPostgreSQLでも実はPub/Subができます。 + +すでに業務でPostgreSQLを使っていれば、新たにPub/Subブローカーを構築しなくても、疎結合なシステム間通信を簡易的に実現できます。 + +本記事ではこの機能の紹介と、Pub/SubクライアントをJavaで実装する場合の選択肢、考慮点を示しています。 +※実行環境はPostgreSQL 16.2とJava 21です +※データベースの文字コードはUTF-8としています + +# NOTIFY/LISTEN + +PostgreSQLのPub/Sub機能に関連するクエリは次の3つです。 + +* [NOTIFY](https://www.postgresql.org/docs/current/sql-listen.html)(Publishを実行) + * 構文:`NOTIFY channel [ , payload ]` + * 同じ機能の関数として`pg_notify`も用意されている +* [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)(Subscribeを開始) + * 構文:`LISTEN channel` +* [UNLISTEN](https://www.postgresql.org/docs/current/sql-unlisten.html)(Subscribeを終了) + * 構文:`UNLISTEN { channel | * }` + +基本的な使い方と挙動を見ていきます。 + +```sql チャネル"foo"のSubscribeを開始 +LISTEN foo; +``` +```sql チャネル"foo"に"hello"というデータをPublish +NOTIFY foo, 'hello'; +-- または +SELECT pg_notify('foo', 'hello'); + +-- "foo"をSubscribe済みのセッションには次の通知が届く +-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728. +``` +```sql payload無しの通知も可能 +NOTIFY foo; + +-- Asynchronous notification "foo" received from server process with PID 14728. +``` +```sql チャネル"foo"のSuscribeを終了する +UNLISTEN foo; +``` + +とてもシンプルですね。 +続いて、本機能の主な仕様を挙げつつ利用時の考慮点を示します。 +詳細は[公式ドキュメント](https://www.postgresql.jp/document/current/index.html)をご覧いただければと思います。 + +## チャネル + +* チャネルはPub/Sub通信する際のキーとなる任意の文字列です。LISTEN対象のチャネルとNOTIFYを実行するチャネルが異なるとデータのやり取りができません。 +* 一つのセッションで複数のチャネルをLISTENすることができます。 +* チャネルに指定できる文字は、ASCIIの場合英数字とアンダーバー(_)が使用できます。大文字/小文字は区別されません。なお、マルチバイト文字も使用できることを確認しています。 + + ```sql:マルチバイト文字のチャネルに通知 + NOTIFY こんにちは, '世界'; + + -- Asynchronous notification "こんにちは" with payload "世界" received from server process with PID 14728. + ``` + +* 63バイトを超えるチャネルは登録することができません。超えた分は切り捨てて処理されます。この制限はテーブルなど他のデータベースオブジェクトとも共通しています。 + +## スコープ + +* Pub/Subを行うDBセッションは、同一データベースに接続し、かつ同じチャネルを通知対象としなければいけません。 +* データベースが同じであれば、スキーマが異なっていても通知できます。! + +68747470733a2f2.png + + + +## ペイロードのデータ型・サイズ + +* ペイロードに乗せられるデータはテキストのみで、バイナリは送受信できません。 +* バイナリデータを乗せる場合は`encode`関数でテキスト形式に変換したり、呼出元アプリでJSON文字列等にシリアライズしてあげる必要があります。 +* ペイロードのサイズ上限は8000バイト未満で、これを超えると次のエラーが返ります。 + + ```text + ERROR: payload string too long + SQL state: 22023 + ``` + +## トランザクション + +* トランザクション内でNOTIFYしたデータは、COMMITしたタイミングで、LISTENしたセッションに通知されます。ROLLBACKすると通知されません。 +* トランザクション内でNOTIFYしたデータの中で、ペイロードが同一のものはまとめられます。送信順序は保証されます。 + +```sql まとめられた通知 +BEGIN; +NOTIFY foo, 'a'; +NOTIFY foo, 'a'; +NOTIFY foo, 'a'; +NOTIFY foo, 'b'; +NOTIFY foo, 'c'; +COMMIT; + +-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728. +-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728. +-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728. +``` + +## 未処理メッセージの蓄積サイズ + +* DBインスタンスには、トランザクションが未完了なメッセージをメモリ上に溜めておくことが出来るNotificationキューを持っています。標準インストールの場合サイズは8GBで、使用量が半分になると警告ログが出力されます。 +* トランザクションが終了するとキューデータがクリーンアップされます。ペイロードを目一杯使った場合およそ100万件で上限に掛かるため、適当な件数単位でCOMMITしましょう。 +* Notificationキューの使用率は`pg_notification_queue_usage`関数で確認できます(0から1までの小数で表現)。 + + +# JavaによるPub/Subクライアント実装 + +これまで記載したPub/Sub通信をJavaで実装するときのパターンを3種類紹介します。 + +## PostgreSQL JDBCドライバ + +PostgreSQL本家のJDBCドライバを使った実装例です(本家の実装例は[こちら](https://jdbc.postgresql.org/documentation/server-prepare/#listen--notify))。 +Mavenを使う場合は以下の依存関係を追加します。 + +```xml pom.xml + + org.postgresql + postgresql + 42.7.3 + +``` +```java +// 事前にLISTEN用コネクションを作成しておく +private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class); + +/** + * 通知の受信を開始します。 + * + * @param channel チャネル + */ +private void startListen(final String channel) { + try { + try (var stmt = this.listenerConn.createStatement()) { + stmt.execute("LISTEN " + channel); + while (true) { + var notifications = pgconn.getNotifications(10 * 1000); + if (this.terminated) { + return; + } + if (notifications == null) { + continue; + } + for (var n : notifications) { + LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter()); + } + } + } + } catch (SQLException e) { + LOG.error("exception thrown {}", e.getMessage()); + } +} + +/** + * PostgreSQLサーバに通知を行います。 + * + * @param channel チャネル + * @param payload メッセージペイロード + */ +private void notify(final String channel, final String payload) { + try { + var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class); + var pstmt = conn.prepareStatement("select pg_notify(?, ?)"); + try (conn; pstmt) { + pstmt.setString(1, channel); + pstmt.setString(2, payload); + pstmt.execute(); + } + LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload); + } catch (SQLException e) { + LOG.error("exception thrown", e); + } +} +``` +* `PgConnection#getNotifications(int timeoutMillis)`を使うと、通知が来るまで指定の時間ここでブロックするため、ループで囲えばロングポーリング的なロジックになります。 +* なお`NOTIFY`クエリでペイロードのパラメータバインドを試みると`org.postgresql.util.PSQLException`が出てしまうので代わりに`pg_notify`を実行しています。[^1] + +## PGJDBC-NG + +* [PGJDBC-NG](https://impossibl.github.io/pgjdbc-ng/)はJDBC4.2に準拠し、PostgreSQLの機能をより高度に使うことをめざして開発されているOSSです。 + +```xml pom.xml + + com.impossibl.pgjdbc-ng + pgjdbc-ng + 0.8.9 + +``` +```java +// 事前にLISTEN用コネクションを作成しておく +private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class); + +/** + * 通知の受信を開始します。 + * + * @param channel チャネル + */ +private void startListen(final String channel) { + try { + this.listenerConn.addNotificationListener(new PGNotificationListener() { + @Override + public void notification(final int pid, final String channel, final String payload) { + LOG.info("Received Notification: {}, {}, {}", pid, channel, payload); + } + }); + try (var stmt = this.listenerConn.createStatement()) { + stmt.execute("LISTEN " + channel); + } + } catch (SQLException e) { + LOG.error("exception thrown {}", e.getMessage()); + } +} + +// notify()は、PostgreSQL JDBCドライバと同様 +``` +ご覧の通り、こちらは通知受信時の動作をイベントリスナーの形で実装できます。 +チャネルを指定してリスナーを登録することも可能です。 + +## R2DBC + +[R2DBC](https://r2dbc.io/)は、リアクティブプログラミングの観点から新たに開発されたJDBCドライバです。 +Reactive Streamsに完全準拠し、I/Oは完全にノンブロッキングであると謳っています。 + +```xml pom.xml + + org.postgresql + r2dbc-postgresql + 1.0.5.RELEASE + +``` +```java +// 事前に送受信用のコネクションを設定しておく +private Mono receiver; +private Mono sender; + +var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder() + .host("...") + .port(5432) + .username("...") + .password("...") + .database("...") + .build()); +this.receiver = connFactory.create(); +this.sender = connFactory.create(); + +/** + * 通知の受信を開始します。 + * + * @param channel チャネル + */ +private void startListen(final String channel) { + this.receiver.map(pgconn -> { + return pgconn.createStatement("LISTEN " + channel) + .execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .thenMany(pgconn.getNotifications()) + .doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter())) + .doOnSubscribe(s -> LOG.info("listen start")) + .subscribe(); + }).subscribe(); +} + +/** + * PostgreSQLサーバに通知を行います。 + * + * @param channel チャネル + * @param payload メッセージペイロード + */ +private void notify(final String channel, final String payload) { + this.sender.map(pgconn -> { + return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'") + .execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .then() + .doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload)) + .subscribe(); + }).subscribe(); +} +``` +R2DBCを使う際は、依存する[Project Reactor](https://projectreactor.io/)のAPIを全面的に使うことになります。 + +今回は簡単な説明にとどめますが、クエリを実行、結果をハンドリングし、指定のタイミングで動く付帯的なアクションを設定する、という一連のフローを構築して、最後にこのフローが動き出すように`subscribe()`を呼び出しています。`doOnNext()`で通知が届いたときのアクションを、`doOnSubscribe()`でsubscribeしたとき(クエリを実行するタイミング)のアクションを設定しており、ここでは単純にログ出力しています。 +JavaのStream APIと似たスタイルで非同期・ストリーム処理を作る感じで、私も初見は面食らったのですが、こちらのページがとても勉強になりました。 + +* [ReactorでN+1問題な処理を実装してみた話](https://cero-t.hatenadiary.jp/entry/20171215/1513290305) +* [怖くないR2DBC](https://bufferings.hatenablog.com/entry/2018/11/18/102433) + +# おわりに + +PostgreSQLのNOTIFY/LISTENは[リリース9.0](https://www.postgresql.jp/document/9.4/html/release-9-0.html)で、待ち状態イベントの格納先が従来のシステムテーブルからメモリキューに代わり、通知と一緒にペイロードを送信できるようになったことで、おおよそ現在の形になりました。近年もリリース13.0で性能向上を遂げており、地味な機能ながら進化を続けているようです。 + +機能自体は古くから搭載されています[^2]がQiitaではこれまで記事化されていないため、社内の技術検証で得た情報整理も兼ねて記事化してみました。 + +[^1]: 公式ドキュメントにはNOTIFYのペイロードにはリテラル文字列を設定しなければならず、一方でpg_notifyには不定のチャネル、ペイロードに対応すると謳われているので、NOTIFYはパラメータバインドには対応していない模様です。 + +[^2]: NOTIFY/LISTENのリリースノートの初出は[1995年7月のリリース0.03のバグフィックス](https://www.postgresql.org/docs/release/0.03/#:~:text=*%20the%20LISTEN/NOTIFY%20asynchronous%20notification%20mechanism%20now%20work)なので、本当に最初期から搭載された機能だとわかります。 + diff --git a/source/images/20240628a/68747470733a2f2.png b/source/images/20240628a/68747470733a2f2.png new file mode 100644 index 000000000000..3b3139ac4bc6 Binary files /dev/null and b/source/images/20240628a/68747470733a2f2.png differ diff --git a/source/images/20240628a/thumbnail.png b/source/images/20240628a/thumbnail.png new file mode 100644 index 000000000000..526c8354cfc7 Binary files /dev/null and b/source/images/20240628a/thumbnail.png differ