久しぶりにKafkaネタです。 Change Data Capture (CDC)でデータベースの変更をKafkaのイベントに変換するDebeziumを触り始めています。 PostgreSQLのレコードをKafkaに流すときに権限周りでハマったので、備忘録として残します。
Debeziumのおさらい
Debeziumはデータベースへのデータ登録、変更、削除をイベントに変換するソフトウェアです。 Kafka ConnectのSource Connectorとしてデプロイします。
Debeziumは今回扱うPostgreSQL以外にも、MySQL、Oracle、Db2といったRDBだけでなく、MongoDB、CassandraといったNoSQLもサポートしていることが特徴です。データベース固有の実装はConnector Pluginで抽象化されています。
PostgreSQL向けのConnector Pluginは、PostgreSQLの論理レプリケーションを通じてトランザクションログを読み込み、Kafkaのイベントに変換します。 そのため、PostgreSQL側にも論理レプリケーションを実行するための準備が必要です。
環境情報
コンポーネント名 | バージョン |
---|---|
Kafka | 3.2 |
PostgreSQL | 14.5 |
Debezium | 1.9.5.Final |
課題
DebeziumのPostgreSQL Connectorは、PostgreSQLの論理レプリケーションを使用します。 PostgreSQLの論理レプリケーションはパブリケーション(PUBLICATION)と呼ばれる、レプリケーション対象のテーブルの集合および操作を定義したオブジェクトが必要になります。 Debeziumのデフォルトの設定では、Connectorがパブリケーションを自動生成します。 しかし本番環境ではConnectorのロールに渡す権限を最小限にするため、パブリケーションを事前に作成してConnectorから利用することが推奨されています。
ハマりポイントは事前にパブリケーションを作成する時にあります。
論理レプリケーションのプラグインに pgoutput を使用する場合、Debeziumのドキュメントに従って次のSQLでロールとパブリケーションを定義します。
ロールに REPLICATION
権限をつけることで論理レプリケーションを許可します。
また、作成するパブリケーション dbz_publication
は、Debeziumがデフォルトで使用するパブリケーション名です。
-- Debeziumで使用するロール(=ユーザー) CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD '***'; -- 現在および今後作成される全てのテーブルをレプリケーション対象にする CREATE PUBLICATION dbz_publication FOR ALL TABLES;
しかし、この状態でDebeziumのConnectorを起動すると以下のログが出力され、permission denied for table contents
で起動に失敗します。
Snapshot step 1 - Preparing ... Snapshot step 2 - Determining captured tables ... Snapshot step 3 - Locking captured tables [テーブル名...] Snapshot step 4 - Determining snapshot offset ... Snapshot step 5 - Reading structure of captured tables Snapshot step 6 - Persisting schema history Snapshot step 7 - Snapshotting data ... Snapshotting contents of N tables while still in transaction Exporting data from table '<スキーマ名>.<テーブル名>' (1 of N tables) For table '<スキーマ名>.<テーブル名>' using select statement: 'SELECT column1, column2, ... FROM <スキーマ名>.<テーブル名>' Snapshot - Final stage Producer failure [io.debezium.pipeline.ErrorHandler] io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table <スキーマ名>.<テーブル名> failed Caused by: org.postgresql.util.PSQLException: ERROR: permission denied for table <テーブル名> (...後略...)
Connectorの状況をKafka ConnectのREST APIで確認しても FAILED
ステータスになっています。
$ curl -s http://localhost:8083/connectors/postgresql-dbz-source-connector/status | jq . { "name": "postgresql-dbz-source-connector", "connector": {...}, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "10.20.33.45:8083", "trace": "..." } ], "type": "source" }
対策
Connectorの起動失敗メッセージはDebeziumがPostgreSQLに接続に使用するロールの権限エラーです。 DebeziumはConnector起動直後に、現在のテーブルの状態のスナップショットを取得します。 このスナップショットを取得する際にSELECT文を発行します。
select statement: 'SELECT column1, column2, ... FROM <スキーマ名>.<テーブル名>
上記のロールはスナップショットを取得するテーブルのSELECT権限を事前につける必要があります。
スキーマ内の全てのテーブルを対象とする場合、次のSQLで debezium
ロールにSELECT権限をつけます。
GRANT SELECT ON ALL TABLES IN SCHEMA <スキーマ名> TO debezium;
再度Connectorを起動すると、無事イベントの連携が始まりました。
$ curl -s http://localhost:8083/connectors/postgresql-dbz-source-connector/status | jq . { "name": "postgresql-dbz-source-connector", "connector": {...}, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.20.33.45:8083", "trace": "..." } ], "type": "source" }
以上。