Ponz Dev Log

ゆるくてマイペースな開発日記

DebeziumでPostgreSQLをCDCするときはロールにSELECT権限をつける

久しぶりにKafkaネタです。 Change Data Capture (CDC)でデータベースの変更をKafkaのイベントに変換するDebeziumを触り始めています。 PostgreSQLのレコードをKafkaに流すときに権限周りでハマったので、備忘録として残します。

Debeziumのおさらい

Debeziumはデータベースへのデータ登録、変更、削除をイベントに変換するソフトウェアです。 Kafka ConnectのSource Connectorとしてデプロイします。

Debeziumは今回扱うPostgreSQL以外にも、MySQLOracleDb2といったRDBだけでなく、MongoDB、CassandraといったNoSQLもサポートしていることが特徴です。データベース固有の実装はConnector Pluginで抽象化されています。

PostgreSQL向けのConnector Pluginは、PostgreSQLの論理レプリケーションを通じてトランザクションログを読み込み、Kafkaのイベントに変換します。 そのため、PostgreSQL側にも論理レプリケーションを実行するための準備が必要です。

環境情報

コンポーネント バージョン
Kafka 3.2
PostgreSQL 14.5
Debezium 1.9.5.Final

課題

DebeziumのPostgreSQL Connectorは、PostgreSQLの論理レプリケーションを使用します。 PostgreSQLの論理レプリケーションはパブリケーション(PUBLICATION)と呼ばれる、レプリケーション対象のテーブルの集合および操作を定義したオブジェクトが必要になります。 Debeziumのデフォルトの設定では、Connectorがパブリケーションを自動生成します。 しかし本番環境ではConnectorのロールに渡す権限を最小限にするため、パブリケーションを事前に作成してConnectorから利用することが推奨されています。

debezium.io

ハマりポイントは事前にパブリケーションを作成する時にあります。 論理レプリケーションプラグインに pgoutput を使用する場合、Debeziumのドキュメントに従って次のSQLでロールとパブリケーションを定義します。 ロールに REPLICATION 権限をつけることで論理レプリケーションを許可します。 また、作成するパブリケーション dbz_publication は、Debeziumがデフォルトで使用するパブリケーション名です。

-- Debeziumで使用するロール(=ユーザー)
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD '***';

-- 現在および今後作成される全てのテーブルをレプリケーション対象にする
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

しかし、この状態でDebeziumのConnectorを起動すると以下のログが出力され、permission denied for table contentsで起動に失敗します。

Snapshot step 1 - Preparing ...
Snapshot step 2 - Determining captured tables ...
Snapshot step 3 - Locking captured tables [テーブル名...]
Snapshot step 4 - Determining snapshot offset ...
Snapshot step 5 - Reading structure of captured tables
Snapshot step 6 - Persisting schema history
Snapshot step 7 - Snapshotting data ...
Snapshotting contents of N tables while still in transaction
Exporting data from table '<スキーマ名>.<テーブル名>' (1 of N tables)
     For table '<スキーマ名>.<テーブル名>' using select statement: 'SELECT column1, column2, ... FROM <スキーマ名>.<テーブル名>'
Snapshot - Final stage
Producer failure   [io.debezium.pipeline.ErrorHandler] io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table <スキーマ名>.<テーブル名> failed
Caused by: org.postgresql.util.PSQLException: ERROR: permission denied for table <テーブル名>

(...後略...)

Connectorの状況をKafka ConnectのREST APIで確認しても FAILED ステータスになっています。

$ curl -s http://localhost:8083/connectors/postgresql-dbz-source-connector/status | jq .
{
  "name": "postgresql-dbz-source-connector",
  "connector": {...},
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.20.33.45:8083",
      "trace": "..."
    }
  ],
  "type": "source"
}

対策

Connectorの起動失敗メッセージはDebeziumがPostgreSQLに接続に使用するロールの権限エラーです。 DebeziumはConnector起動直後に、現在のテーブルの状態のスナップショットを取得します。 このスナップショットを取得する際にSELECT文を発行します。

select statement: 'SELECT column1, column2, ... FROM <スキーマ名>.<テーブル名>

上記のロールはスナップショットを取得するテーブルのSELECT権限を事前につける必要があります。 スキーマ内の全てのテーブルを対象とする場合、次のSQLdebezium ロールにSELECT権限をつけます。

GRANT SELECT ON ALL TABLES IN SCHEMA <スキーマ名> TO debezium;

再度Connectorを起動すると、無事イベントの連携が始まりました。

$ curl -s http://localhost:8083/connectors/postgresql-dbz-source-connector/status | jq .
{
  "name": "postgresql-dbz-source-connector",
  "connector": {...},
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.20.33.45:8083",
      "trace": "..."
    }
  ],
  "type": "source"
}

以上。