✍️ Khoa📅 22/04/2026☕ 19 phút đọc

CDC & Event Streaming — Change Data Capture Như Là Integration Backbone

Change Data Capture (CDC) là pattern cho phép bạn stream database changes vào các hệ thống khác — search index, data warehouse, cache, microservices — mà không cần application code biết về downstream consumers. Thay vì dual-write (dễ inconsistent) hay polling (tốn I/O và lag cao), CDC tap vào replication log của database — nơi mọi thay đổi đều được ghi trước khi commit.

Bài này cover Debezium với PostgreSQL (logical replication slots) và MySQL (binlog tailing), exactly-once delivery trong CDC pipelines, schema evolution challenges khi upstream database đổi structure, và cách dùng CDC kết hợp outbox pattern để có guaranteed delivery mà không cần distributed transaction.


Mục lục

  1. Tại sao CDC thay vì Dual-Write hay Polling
  2. PostgreSQL Logical Replication — Cơ chế bên dưới
  3. MySQL Binlog Tailing
  4. Debezium — CDC Connector thực tế
  5. Exactly-Once Delivery trong CDC Pipelines
  6. Schema Evolution — Thay đổi database mà không vỡ pipeline
  7. Outbox Pattern — CDC + Guaranteed Delivery
  8. CDC vào Data Warehouse — End-to-end pipeline

1. Tại sao CDC thay vì Dual-Write hay Polling

Ba cách sync data giữa systems

Approach 1: Dual-Write — Viết đồng thời vào nhiều nơi

def complete_order(order_id: str):
    # Write vào primary DB
    db.execute("UPDATE orders SET status = 'completed' WHERE id = %s", order_id)

    # Đồng thời sync sang Elasticsearch
    es.update(index="orders", id=order_id, body={"doc": {"status": "completed"}})

    # Và invalidate cache
    redis.delete(f"order:{order_id}")

    # Và update data warehouse
    warehouse.upsert("orders", {"id": order_id, "status": "completed"})

Vấn đề: DB write thành công, ES write fail → inconsistent state. Không có transaction bao phủ cả hai. Retry thì idempotent không? ES đã partial update chưa? Failure scenario nhân với số lượng downstream systems.

Approach 2: Polling — Periodically check xem có gì thay đổi không

def sync_job():
    while True:
        # Lấy tất cả rows updated trong 5 phút qua
        changed = db.query("""
            SELECT * FROM orders 
            WHERE updated_at > NOW() - INTERVAL '5 minutes'
        """)

        for row in changed:
            es.index(index="orders", id=row.id, body=row.to_dict())

        time.sleep(300)  # Chạy mỗi 5 phút

Vấn đề:

  • Lag: Thay đổi lúc 10:00:01 không được sync đến 10:05
  • Deletes không detect được: WHERE updated_at > X không thấy deleted rows
  • Missing changes: Nếu một row được update 3 lần trong 5 phút, bạn chỉ thấy trạng thái cuối — bỏ qua các trạng thái trung gian
  • Index scan costly: updated_at phải được index, nhưng scan liên tục vẫn tốn I/O

Approach 3: CDC — Tap vào replication log

Database writes → WAL (Write-Ahead Log) → CDC connector → Kafka → Consumers
                  (PostgreSQL)            (Debezium)
                  binlog (MySQL)

CDC đọc log mà database vốn đã ghi để phục vụ replication. Không phải query thêm. Thấy mọi thay đổi theo thứ tự, kể cả deletes. Lag tính bằng milliseconds.

So sánh thực tế

Dual-Write Polling CDC
Consistency Weak (partial failure) Eventual (lag) Strong (log-based)
Deletes Explicit chỉ Không detect Tự động
Lag ~0ms Vài phút <1 giây
DB load Tăng theo consumers Tăng theo frequency Minimal (read log)
Implementation Đơn giản Đơn giản Phức tạp hơn
Failure recovery Manual Tự động (re-poll) Tự động (replay log)

2. PostgreSQL Logical Replication — Cơ chế bên dưới

WAL là gì

PostgreSQL ghi mọi thay đổi vào Write-Ahead Log (WAL) trước khi apply vào actual data files. WAL được dùng cho crash recovery (replay lại WAL sau crash), streaming replication sang standby, và logical replication (CDC).

Có hai loại WAL decoding:

  • Physical (streaming replication): Byte-level copy của disk pages. Nhanh nhưng không human-readable và standby phải dùng cùng PostgreSQL version.
  • Logical replication: High-level changes — "INSERT row X vào bảng Y với values Z". Human-readable, version-independent. CDC dùng cái này.

Logical Replication Slots

Replication slot là bookmark trong WAL — PostgreSQL sẽ giữ WAL không bị xóa cho đến khi slot consumer đọc đến đó.

-- Tạo replication slot cho Debezium
SELECT pg_create_logical_replication_slot(
    'debezium_slot',    -- tên slot
    'pgoutput'          -- output plugin: pgoutput (built-in) hoặc wal2json
);

-- Xem slots hiện có
SELECT slot_name, plugin, active, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
/*
slot_name      | plugin    | active | restart_lsn | confirmed_flush_lsn
debezium_slot  | pgoutput  | true   | 0/1A000000  | 0/1A000100
*/

-- confirmed_flush_lsn: Consumer đã đọc đến đây
-- restart_lsn: WAL phải được giữ từ đây về sau

Nguy hiểm quan trọng: Nếu slot consumer (Debezium) chết và không reconnect trong nhiều giờ, PostgreSQL sẽ giữ WAL tích lũy — disk có thể đầy và crash cả database.

-- Monitor replication lag — phải alert khi lag > threshold
SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
    active
FROM pg_replication_slots;
# Alert khi WAL lag > 1GB
def check_replication_slot_health():
    result = db.query("""
        SELECT slot_name,
               pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
        FROM pg_replication_slots
        WHERE NOT active
    """)
    for row in result:
        if row.lag_bytes > 1_000_000_000:  # 1GB
            alert_slack(f"Replication slot {row.slot_name} is inactive with {row.lag_bytes/1e9:.1f}GB lag!")

Publication — Chọn bảng nào để replicate

-- Tạo publication cho các bảng cần CDC
CREATE PUBLICATION debezium_pub FOR TABLE 
    orders, 
    order_items, 
    users,
    payments;

-- Hoặc toàn bộ database (cẩn thận với high-churn tables)
CREATE PUBLICATION debezium_pub FOR ALL TABLES;

-- Cấu hình PostgreSQL để enable logical replication
-- postgresql.conf:
wal_level = logical           -- Bắt buộc
max_replication_slots = 10    -- Số slots tối đa
max_wal_senders = 10          -- Số concurrent replication connections

Xem WAL events thực tế

-- Test đọc WAL thủ công (dùng pg_logical_slot_get_changes)
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes('debezium_slot', NULL, NULL, 
    'proto_version', '1', 'publication_names', 'debezium_pub');

-- Output khi INSERT một row:
-- data: {"action":"I","timestamp":"...","schema":"public","table":"orders",
--        "columns":[{"name":"id","value":123},{"name":"status","value":"pending"}]}

-- Output khi DELETE:
-- data: {"action":"D","timestamp":"...","schema":"public","table":"orders",
--        "identity":[{"name":"id","value":123}]}

3. MySQL Binlog Tailing

Binlog format

MySQL ghi changes vào binary log (binlog) — tương tự WAL của PostgreSQL nhưng format khác.

-- Enable binlog với ROW format (bắt buộc cho CDC)
-- my.cnf:
-- [mysqld]
-- log_bin = mysql-bin
-- binlog_format = ROW         -- Ghi actual row values, không phải SQL statements
-- binlog_row_image = FULL     -- Ghi cả before/after image của row
-- expire_logs_days = 7        -- Giữ binlog 7 ngày

-- Kiểm tra binlog đang bật
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

-- Xem danh sách binlog files
SHOW BINARY LOGS;
/*
Log_name          | File_size
mysql-bin.000001  | 177
mysql-bin.000002  | 1073741952
mysql-bin.000003  | 54321
*/

-- Đọc binlog thủ công
-- mysqlbinlog --read-from-remote-server --host=localhost \
--             --start-datetime="2026-04-22 10:00:00" mysql-bin.000003

GTID — Global Transaction ID

MySQL 5.6+ có GTID (Global Transaction ID) — unique identifier cho mỗi transaction. Debezium dùng GTID để track position, không phải file:offset như trước.

-- Enable GTID (recommend cho production CDC)
-- my.cnf:
-- gtid_mode = ON
-- enforce_gtid_consistency = ON

-- Xem GTID đã executed
SHOW VARIABLES LIKE 'gtid_executed';
-- Value: 'a3a55ee2-1234-5678-abcd-ef0123456789:1-1000'
-- Format: server_uuid:transaction_id_range

GTID quan trọng khi failover: Nếu MySQL primary chết và Debezium reconnect vào replica, GTID giúp resume từ đúng position mà không cần restart full snapshot.


4. Debezium — CDC Connector thực tế

Architecture

PostgreSQL/MySQL
      │
      │ (replication protocol)
      ▼
┌─────────────────┐
│    Debezium     │ ← chạy trong Kafka Connect worker
│    Connector    │
└────────┬────────┘
         │ (Kafka producer)
         ▼
    Kafka Topics
    ├── dbserver.public.orders
    ├── dbserver.public.order_items
    └── dbserver.public.users
         │
         ▼
    Consumers
    (data warehouse, search, cache, other services)

Cấu hình PostgreSQL connector

{
    "name": "orders-postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secret",
        "database.dbname": "production",
        "database.server.name": "dbserver",

        "plugin.name": "pgoutput",
        "publication.name": "debezium_pub",
        "slot.name": "debezium_slot",

        "table.include.list": "public.orders,public.order_items,public.users",

        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",

        "heartbeat.interval.ms": "10000",  // Giữ slot active khi không có changes
        "snapshot.mode": "initial"         // Full snapshot lần đầu, sau đó incremental
    }
}

Event format — Trước và sau transform

Raw Debezium event (trước ExtractNewRecordState):

{
    "before": {
        "id": 123,
        "status": "pending",
        "amount": 150000,
        "updated_at": 1745317800000
    },
    "after": {
        "id": 123,
        "status": "completed",
        "amount": 150000,
        "updated_at": 1745317900000
    },
    "source": {
        "version": "2.4.0",
        "connector": "postgresql",
        "ts_ms": 1745317900100,
        "db": "production",
        "schema": "public",
        "table": "orders",
        "txId": 789012,
        "lsn": 27262976
    },
    "op": "u",  // c=create, u=update, d=delete, r=read(snapshot)
    "ts_ms": 1745317900200
}

Sau ExtractNewRecordState (thường dùng hơn cho consumers):

{
    "id": 123,
    "status": "completed",
    "amount": 150000,
    "updated_at": 1745317900000,
    "__op": "u",
    "__ts_ms": 1745317900200,
    "__source_ts_ms": 1745317900100
}

Giữ before hay không tùy use case:

  • Sync sang search index: Chỉ cần after — update document với state mới
  • Audit log: Cần cả beforeafter — ghi lại "changed from X to Y"
  • Delete handling: after là null, dùng before để biết cần xóa document nào

Consumer example — Sync vào Elasticsearch

from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch, helpers

es = Elasticsearch(["http://elasticsearch:9200"])
consumer = KafkaConsumer(
    "dbserver.public.orders",
    bootstrap_servers=["kafka:9092"],
    group_id="es-sync-consumer",
    value_deserializer=lambda m: json.loads(m.decode()),
    auto_offset_reset="earliest",
    enable_auto_commit=False,   # Manual commit sau khi sync thành công
)

def process_batch(messages: list):
    actions = []
    for msg in messages:
        event = msg.value
        op = event.get("__op")

        if op in ("c", "u", "r"):  # create, update, snapshot read
            actions.append({
                "_op_type": "index",
                "_index": "orders",
                "_id": event["id"],
                "_source": {
                    "order_id": event["id"],
                    "status": event["status"],
                    "amount": event["amount"],
                    "updated_at": event["updated_at"],
                }
            })
        elif op == "d":  # delete
            actions.append({
                "_op_type": "delete",
                "_index": "orders",
                "_id": event["id"],
            })

    if actions:
        helpers.bulk(es, actions)

BATCH_SIZE = 500
batch = []

for message in consumer:
    batch.append(message)

    if len(batch) >= BATCH_SIZE:
        process_batch(batch)
        # Commit sau khi ES confirm write thành công
        consumer.commit()
        batch = []

Snapshot mode — Initial load

Lần đầu chạy Debezium, cần load toàn bộ existing data trước khi switch sang streaming. Đây là initial snapshot.

Phase 1 — Snapshot:
  Debezium đọc toàn bộ tables bằng SELECT
  Emit tất cả rows với op="r" (read)
  Mark snapshot complete

Phase 2 — Streaming:
  Switch sang đọc WAL/binlog từ LSN tại thời điểm snapshot bắt đầu
  Replay bất kỳ changes nào xảy ra trong lúc snapshot
  Continue streaming

Snapshot của bảng hàng triệu rows có thể mất hàng giờ và tạo load lớn trên DB. Tune bằng:

{
    "snapshot.fetch.size": "1000",         // Rows per fetch
    "snapshot.max.threads": "1",           // Tránh parallel snapshot với DB nhỏ
    "snapshot.select.statement.overrides": "public.orders:SELECT * FROM public.orders WHERE created_at > '2025-01-01'"
    // Snapshot chỉ lấy data gần đây, không cần history
}

5. Exactly-Once Delivery trong CDC Pipelines

Ba delivery semantics

At-most-once:   Message có thể bị mất, không bao giờ duplicate
At-least-once:  Message không bị mất, nhưng có thể duplicate
Exactly-once:   Message delivered đúng một lần, không mất, không duplicate

Kafka broker + Debezium mặc định là at-least-once. Connector crash và restart → có thể replay một số messages.

Tại sao exactly-once khó

Debezium đọc WAL event:
  LSN 1000: INSERT order id=123
  LSN 1001: UPDATE order id=123 status=completed

Debezium gửi cả hai sang Kafka.
Kafka confirm nhận LSN 1000.
Trước khi Debezium commit offset LSN 1001, connector crash.

Restart: Debezium đọc lại từ LSN 1000 (chưa được commit).
→ Kafka nhận duplicate: INSERT order id=123 lần thứ 2.

Approach 1: Idempotent consumers (phổ biến nhất)

Không cố gắng prevent duplicates ở producer — design consumer để tolerate duplicates.

def upsert_to_warehouse(event: dict):
    order_id = event["id"]
    lsn = event.get("__lsn")  # Debezium expose LSN trong source field

    # Upsert: nếu cùng order_id và LSN nhỏ hơn hoặc bằng đã có → skip
    warehouse.execute("""
        INSERT INTO orders (id, status, amount, _cdc_lsn, _cdc_ts)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (id) DO UPDATE SET
            status = EXCLUDED.status,
            amount = EXCLUDED.amount,
            _cdc_lsn = EXCLUDED._cdc_lsn,
            _cdc_ts = EXCLUDED._cdc_ts
        WHERE orders._cdc_lsn < EXCLUDED._cdc_lsn  -- Chỉ update nếu event mới hơn
    """, (order_id, event["status"], event["amount"], lsn, event["__ts_ms"]))

ON CONFLICT ... WHERE _cdc_lsn < EXCLUDED._cdc_lsn là idempotency key — cùng LSN replay thì no-op.

Approach 2: Kafka Transactions + Exactly-Once Semantics

Kafka 0.11+ support transactions giữa producer và consumer. Debezium 1.9+ support exactly-once khi dùng với Kafka transactions.

{
    "exactly.once.support": "required",
    "producer.override.enable.idempotence": "true",
    "producer.override.isolation.level": "read_committed",
    "producer.override.acks": "all"
}

Cách hoạt động:

  1. Debezium mở Kafka transaction
  2. Gửi tất cả events trong một DB transaction vào Kafka
  3. Commit Kafka transaction và update Kafka offset atomically
  4. Nếu crash xảy ra, Kafka transaction bị rollback → không có partial write

Limitation: Chỉ guarantee exactly-once Kafka → Kafka. Downstream consumer (ES, warehouse) vẫn cần idempotency.

Approach 3: Deduplication với Redis

Khi consumer là external system không support upsert:

import hashlib

def deduplicated_consumer(event: dict, process_fn):
    # Tạo unique ID cho event
    event_id = f"{event['__source_ts_ms']}:{event['id']}:{event['__op']}"
    dedup_key = f"cdc:seen:{hashlib.md5(event_id.encode()).hexdigest()}"

    # Set với NX (only if not exists) và TTL 24h
    is_new = redis.set(dedup_key, "1", nx=True, ex=86400)

    if is_new:
        process_fn(event)
    # else: duplicate, skip

# Dùng:
for message in consumer:
    deduplicated_consumer(message.value, lambda e: send_to_webhook(e))

6. Schema Evolution — Thay đổi database mà không vỡ pipeline

Vấn đề: Schema incompatibility

Tháng 1: Table orders có columns (id, status, amount)
         → Debezium emit Avro events với schema v1
         → Warehouse table có (id, status, amount)

Tháng 3: DBA thêm column discount_code VARCHAR(50)
         → Debezium emit events với schema v2 (thêm discount_code)
         → Warehouse INSERT fail: "column discount_code does not exist"
         → Pipeline vỡ lúc deploy migration

Safe schema changes với Avro + Schema Registry

Schema Registry enforce compatibility rules. Với BACKWARD compatibility (default):

Allowed changes (backward compatible):

-- Safe: Thêm column với DEFAULT
ALTER TABLE orders ADD COLUMN discount_code VARCHAR(50) DEFAULT NULL;

-- Safe: Tăng độ dài VARCHAR
ALTER TABLE orders ALTER COLUMN description TYPE TEXT;

Avro: Thêm field với default value → schema mới tương thích ngược. Consumer cũ không biết field mới, nhận NULL/default.

Breaking changes:

-- BREAKING: Đổi tên column
ALTER TABLE orders RENAME COLUMN amount TO amount_cents;

-- BREAKING: Đổi type
ALTER TABLE orders ALTER COLUMN user_id TYPE UUID USING user_id::UUID;

-- BREAKING: Xóa column
ALTER TABLE orders DROP COLUMN legacy_field;

Khi xóa hoặc đổi tên column, Avro schema không còn backward compatible → Schema Registry reject → Debezium connector fail.

Zero-downtime schema migration với CDC

Pattern: Expand-Contract (Blue-Green columns)

-- Phase 1 EXPAND: Thêm column mới, giữ column cũ
ALTER TABLE orders ADD COLUMN user_uuid UUID DEFAULT NULL;

-- Backfill background (không lock table)
UPDATE orders SET user_uuid = user_id::UUID
WHERE user_uuid IS NULL
LIMIT 1000;  -- Chạy theo batch, không block

-- Tại đây: cả user_id và user_uuid đều tồn tại
-- Application write vào cả hai, CDC emit cả hai
-- Downstream consumers migrate dần sang user_uuid
-- Phase 2 CONTRACT: Sau khi tất cả consumers đã migrate
-- Xóa column cũ
ALTER TABLE orders DROP COLUMN user_id;

Pattern: Versioned topics

Thay vì migrate in-place, emit sang topic mới:
  - dbserver.public.orders.v1 (consumers cũ vẫn đọc)
  - dbserver.public.orders.v2 (consumers mới đọc schema mới)

Debezium có thể emit đồng thời vào cả hai topic trong migration period.
Sau khi tất cả consumers migrate sang v2, xóa v1.

Xử lý DDL events trong consumer

Debezium emit DDL changes (schema changes) như một loại special event:

def handle_cdc_event(event: dict):
    # DDL event khi schema thay đổi
    if event.get("__op") == "schema_change":
        ddl = event.get("ddl")
        print(f"Schema changed: {ddl}")

        # Tự động apply schema change vào warehouse
        if "ADD COLUMN" in ddl.upper():
            apply_warehouse_migration(ddl)
        elif "DROP COLUMN" in ddl.upper():
            # Cẩn thận: không drop ngay, chờ verify consumers sẵn sàng
            flag_for_review(ddl)
        return

    # Normal data event
    process_data_event(event)

def apply_warehouse_migration(ddl: str):
    # Parse DDL và apply tương đương cho warehouse
    # Ví dụ: "ALTER TABLE orders ADD COLUMN discount_code VARCHAR(50)"
    # → BigQuery: bq query "ALTER TABLE orders ADD COLUMN discount_code STRING"
    warehouse.execute(translate_ddl(ddl))

7. Outbox Pattern — CDC + Guaranteed Delivery

Vấn đề: Đảm bảo event delivery mà không cần 2PC

Scenario phổ biến:

def complete_order(order_id: str):
    with db.transaction():
        db.execute("UPDATE orders SET status = 'completed' WHERE id = %s", order_id)
        # Transaction commit ở đây

    # Nếu crash xảy ra ở đây:
    kafka.produce("order.completed", {"order_id": order_id})  # Event bị mất!

Để fix bằng 2-phase commit (XA transaction spanning DB + Kafka) thì phức tạp, chậm, và hầu hết message brokers không support đầy đủ.

Outbox pattern giải quyết bằng CDC

Ý tưởng: Thay vì publish trực tiếp vào Kafka, write event vào một bảng trong cùng database transaction. CDC sẽ pick up và forward sang Kafka.

-- Tạo outbox table
CREATE TABLE outbox_events (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,  -- 'order', 'user', 'payment'
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ            -- NULL = chưa processed (optional)
);

CREATE INDEX idx_outbox_unprocessed ON outbox_events(created_at)
    WHERE processed_at IS NULL;
def complete_order(order_id: str):
    with db.transaction():
        # Business logic
        db.execute(
            "UPDATE orders SET status = 'completed', updated_at = NOW() WHERE id = %s",
            order_id
        )

        # Write event vào outbox — trong cùng transaction!
        db.execute("""
            INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
            VALUES (%s, %s, %s, %s)
        """, (
            "order",
            order_id,
            "order.completed",
            json.dumps({
                "order_id": order_id,
                "completed_at": datetime.utcnow().isoformat(),
                "status": "completed"
            })
        ))
        # Nếu transaction rollback → cả order update lẫn outbox event đều bị rollback
        # Guaranteed atomicity!
PostgreSQL Transaction commits
    → WAL ghi: UPDATE orders + INSERT outbox_events
    → Debezium đọc outbox_events change
    → Route sang Kafka topic dựa trên aggregate_type/event_type
    → Consumer nhận "order.completed" event

Debezium Outbox Event Router

Debezium có sẵn OutboxEventRouter transform để tự động route từ outbox table sang đúng Kafka topic:

{
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "outbox.${routedByValue}"
}
outbox_events row:
  aggregate_type = "order"
  aggregate_id   = "ord_123"
  event_type     = "order.completed"
  payload        = {...}

→ Kafka topic: "outbox.order"
   Key: "ord_123"
   Value: payload

Cleanup outbox table

Outbox table tăng kích thước nếu không cleanup:

-- Option 1: Soft delete — đánh dấu processed
-- (chạy trong CDC consumer sau khi forward sang Kafka)
UPDATE outbox_events SET processed_at = NOW()
WHERE id = %s;

-- Cleanup job: xóa events đã processed sau 7 ngày
DELETE FROM outbox_events
WHERE processed_at IS NOT NULL
  AND processed_at < NOW() - INTERVAL '7 days';
-- Option 2: Hard delete sau khi Debezium confirm
-- Debezium vẫn thấy DELETE event và forward tombstone sang Kafka
-- Consumer nhận tombstone → biết là processed, không cần action gì
DELETE FROM outbox_events
WHERE created_at < NOW() - INTERVAL '1 hour'
  AND processed_at IS NOT NULL;

Option 2 tốt hơn vì không giữ data lâu, nhưng cần đảm bảo Debezium đã replicate row trước khi xóa. Dùng TTL 1 giờ là safe với hầu hết CDC setups.

Outbox vs Direct CDC — Khi nào dùng cái nào

Direct CDC (Debezium đọc directly từ application tables):

  • Đơn giản hơn, không cần thêm outbox table
  • Events là reflection của DB state — downstream nhận raw DB row
  • Không control được payload structure

Outbox pattern:

  • Events được explicit designed: bạn control payload, naming, versioning
  • Transactional guarantee: event và business data in sync 100%
  • Dễ schema evolve: thay đổi payload không ảnh hưởng DB schema
  • Thêm overhead: thêm bảng, thêm write per transaction

Rule of thumb: Nếu consumer chỉ cần sync DB state (search index, cache, warehouse) → Direct CDC. Nếu consumer cần business events với curated payload (downstream microservices, external webhooks) → Outbox pattern.


8. CDC vào Data Warehouse — End-to-end pipeline

Full pipeline: PostgreSQL → Kafka → S3 → Iceberg

PostgreSQL orders table
    │
    │ (Debezium, logical replication)
    ▼
Kafka topic: dbserver.public.orders
    │
    │ (Kafka Connect S3 Sink, Parquet format)
    ▼
S3: s3://data-lake/raw/orders/year=.../month=.../day=.../
    │
    │ (Apache Spark / Trino)
    ▼
Iceberg table: raw.orders (append-only, full CDC history)
    │
    │ (dbt model)
    ▼
Iceberg table: marts.current_orders (latest state per order_id)

Merge CDC events thành current state

Raw CDC table chứa tất cả events (inserts, updates, deletes). Để analytics cần current state — chỉ giá trị mới nhất của mỗi row.

-- dbt model: marts/current_orders.sql
-- Merge CDC history thành current state

WITH cdc_events AS (
    SELECT
        id             AS order_id,
        status,
        amount,
        user_id,
        __op           AS operation,
        __ts_ms        AS event_ts,
        ROW_NUMBER() OVER (
            PARTITION BY id
            ORDER BY __ts_ms DESC, __lsn DESC  -- Lấy event mới nhất per order
        ) AS rn
    FROM {{ source('raw', 'orders') }}
)

SELECT
    order_id,
    status,
    amount,
    user_id,
    event_ts AS last_updated_at
FROM cdc_events
WHERE rn = 1
  AND operation != 'd'  -- Loại bỏ deleted rows

Iceberg MERGE — Efficient upsert

Với Iceberg, có thể dùng MERGE thay vì ROW_NUMBER trick:

-- Incremental merge với Iceberg MERGE INTO
MERGE INTO marts.current_orders AS target
USING (
    -- Source: CDC events trong batch mới nhất
    SELECT id, status, amount, user_id, __op, __ts_ms
    FROM raw.orders
    WHERE _ingested_at > (SELECT MAX(_ingested_at) FROM marts.current_orders)
) AS source
ON target.order_id = source.id

WHEN MATCHED AND source.__op = 'd' THEN DELETE

WHEN MATCHED AND source.__op IN ('u', 'r') THEN UPDATE SET
    status       = source.status,
    amount       = source.amount,
    last_updated = source.__ts_ms

WHEN NOT MATCHED AND source.__op != 'd' THEN INSERT
    (order_id, status, amount, user_id, last_updated)
    VALUES (source.id, source.status, source.amount, source.user_id, source.__ts_ms)
;

MERGE INTO Iceberg efficient hơn "DELETE + INSERT" hay "ROW_NUMBER scan" vì Iceberg chỉ rewrite các files chứa affected rows, không full table scan.

Monitoring CDC pipeline health

Các metrics cần alert:

# Prometheus metrics cho CDC pipeline health
from prometheus_client import Gauge, Counter

cdc_lag_seconds = Gauge(
    'cdc_pipeline_lag_seconds',
    'Lag between DB change and Kafka message',
    ['connector', 'table']
)

cdc_events_total = Counter(
    'cdc_events_processed_total',
    'Total CDC events processed',
    ['table', 'operation']
)

def update_cdc_metrics(event: dict):
    table = event.get("__source_table", "unknown")
    op = event.get("__op", "unknown")

    # Lag: thời gian từ khi DB change đến khi consumer nhận
    db_ts = event.get("__source_ts_ms", 0) / 1000
    now = time.time()
    lag = now - db_ts

    cdc_lag_seconds.labels(
        connector="debezium",
        table=table
    ).set(lag)

    cdc_events_total.labels(table=table, operation=op).inc()
# Alertmanager rules
groups:
  - name: cdc_alerts
    rules:
      - alert: CDCHighLag
        expr: cdc_pipeline_lag_seconds > 60
        for: 5m
        annotations:
          summary: "CDC lag > 60s for {{ $labels.table }}"

      - alert: CDCConnectorDown
        expr: absent(cdc_pipeline_lag_seconds) == 1
        for: 2m
        annotations:
          summary: "CDC connector appears to be down (no metrics)"

      - alert: ReplicationSlotGrowing
        expr: pg_replication_slots_pg_wal_lsn_diff_bytes > 500000000
        annotations:
          summary: "Replication slot lag > 500MB, risk of disk full"

Tổng kết: Mental Model cho CDC

CDC không phải là technology — đây là pattern giải quyết bài toán: "Làm sao giữ multiple systems consistent với source of truth mà không tạo tight coupling?"

Câu hỏi để chọn approach
─────────────────────────────────────────────────────

Cần sync database state hay emit business events?
  ├── DB state (search index, cache, warehouse)
  │       → Direct CDC với Debezium
  └── Business events (microservices, webhooks)
          → Outbox pattern + Debezium

Cần guarantee gì?
  ├── At-least-once là OK (idempotent consumer)
  │       → Default Debezium setup
  └── Exactly-once required
          → Kafka transactions + idempotent upsert

Schema thay đổi thường xuyên không?
  ├── Ít thay đổi
  │       → Schema Registry với BACKWARD compat
  └── Thay đổi liên tục
          → Expand-contract pattern + versioned topics

Database là gì?
  ├── PostgreSQL
  │       → Logical replication + pgoutput plugin
  │       → Monitor replication slot lag!
  └── MySQL
          → Binlog tailing
          → Enable GTID cho reliable failover

CDC đơn giản hóa kiến trúc đáng kể khi implement đúng — thay vì mỗi service biết về tất cả downstream consumers, database là single source of truth và CDC là broadcast mechanism. Nhưng giống mọi distributed system pattern, failure modes cần được design trước, không phải sau.