CDC & Event Streaming — Change Data Capture Như Là Integration Backbone
Change Data Capture (CDC) là pattern cho phép bạn stream database changes vào các hệ thống khác — search index, data warehouse, cache, microservices — mà không cần application code biết về downstream consumers. Thay vì dual-write (dễ inconsistent) hay polling (tốn I/O và lag cao), CDC tap vào replication log của database — nơi mọi thay đổi đều được ghi trước khi commit.
Bài này cover Debezium với PostgreSQL (logical replication slots) và MySQL (binlog tailing), exactly-once delivery trong CDC pipelines, schema evolution challenges khi upstream database đổi structure, và cách dùng CDC kết hợp outbox pattern để có guaranteed delivery mà không cần distributed transaction.
Mục lục
- Tại sao CDC thay vì Dual-Write hay Polling
- PostgreSQL Logical Replication — Cơ chế bên dưới
- MySQL Binlog Tailing
- Debezium — CDC Connector thực tế
- Exactly-Once Delivery trong CDC Pipelines
- Schema Evolution — Thay đổi database mà không vỡ pipeline
- Outbox Pattern — CDC + Guaranteed Delivery
- CDC vào Data Warehouse — End-to-end pipeline
1. Tại sao CDC thay vì Dual-Write hay Polling
Ba cách sync data giữa systems
Approach 1: Dual-Write — Viết đồng thời vào nhiều nơi
def complete_order(order_id: str):
# Write vào primary DB
db.execute("UPDATE orders SET status = 'completed' WHERE id = %s", order_id)
# Đồng thời sync sang Elasticsearch
es.update(index="orders", id=order_id, body={"doc": {"status": "completed"}})
# Và invalidate cache
redis.delete(f"order:{order_id}")
# Và update data warehouse
warehouse.upsert("orders", {"id": order_id, "status": "completed"})
Vấn đề: DB write thành công, ES write fail → inconsistent state. Không có transaction bao phủ cả hai. Retry thì idempotent không? ES đã partial update chưa? Failure scenario nhân với số lượng downstream systems.
Approach 2: Polling — Periodically check xem có gì thay đổi không
def sync_job():
while True:
# Lấy tất cả rows updated trong 5 phút qua
changed = db.query("""
SELECT * FROM orders
WHERE updated_at > NOW() - INTERVAL '5 minutes'
""")
for row in changed:
es.index(index="orders", id=row.id, body=row.to_dict())
time.sleep(300) # Chạy mỗi 5 phút
Vấn đề:
- Lag: Thay đổi lúc 10:00:01 không được sync đến 10:05
- Deletes không detect được:
WHERE updated_at > Xkhông thấy deleted rows - Missing changes: Nếu một row được update 3 lần trong 5 phút, bạn chỉ thấy trạng thái cuối — bỏ qua các trạng thái trung gian
- Index scan costly:
updated_atphải được index, nhưng scan liên tục vẫn tốn I/O
Approach 3: CDC — Tap vào replication log
Database writes → WAL (Write-Ahead Log) → CDC connector → Kafka → Consumers
(PostgreSQL) (Debezium)
binlog (MySQL)
CDC đọc log mà database vốn đã ghi để phục vụ replication. Không phải query thêm. Thấy mọi thay đổi theo thứ tự, kể cả deletes. Lag tính bằng milliseconds.
So sánh thực tế
| Dual-Write | Polling | CDC | |
|---|---|---|---|
| Consistency | Weak (partial failure) | Eventual (lag) | Strong (log-based) |
| Deletes | Explicit chỉ | Không detect | Tự động |
| Lag | ~0ms | Vài phút | <1 giây |
| DB load | Tăng theo consumers | Tăng theo frequency | Minimal (read log) |
| Implementation | Đơn giản | Đơn giản | Phức tạp hơn |
| Failure recovery | Manual | Tự động (re-poll) | Tự động (replay log) |
2. PostgreSQL Logical Replication — Cơ chế bên dưới
WAL là gì
PostgreSQL ghi mọi thay đổi vào Write-Ahead Log (WAL) trước khi apply vào actual data files. WAL được dùng cho crash recovery (replay lại WAL sau crash), streaming replication sang standby, và logical replication (CDC).
Có hai loại WAL decoding:
- Physical (streaming replication): Byte-level copy của disk pages. Nhanh nhưng không human-readable và standby phải dùng cùng PostgreSQL version.
- Logical replication: High-level changes — "INSERT row X vào bảng Y với values Z". Human-readable, version-independent. CDC dùng cái này.
Logical Replication Slots
Replication slot là bookmark trong WAL — PostgreSQL sẽ giữ WAL không bị xóa cho đến khi slot consumer đọc đến đó.
-- Tạo replication slot cho Debezium
SELECT pg_create_logical_replication_slot(
'debezium_slot', -- tên slot
'pgoutput' -- output plugin: pgoutput (built-in) hoặc wal2json
);
-- Xem slots hiện có
SELECT slot_name, plugin, active, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
/*
slot_name | plugin | active | restart_lsn | confirmed_flush_lsn
debezium_slot | pgoutput | true | 0/1A000000 | 0/1A000100
*/
-- confirmed_flush_lsn: Consumer đã đọc đến đây
-- restart_lsn: WAL phải được giữ từ đây về sau
Nguy hiểm quan trọng: Nếu slot consumer (Debezium) chết và không reconnect trong nhiều giờ, PostgreSQL sẽ giữ WAL tích lũy — disk có thể đầy và crash cả database.
-- Monitor replication lag — phải alert khi lag > threshold
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
active
FROM pg_replication_slots;
# Alert khi WAL lag > 1GB
def check_replication_slot_health():
result = db.query("""
SELECT slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE NOT active
""")
for row in result:
if row.lag_bytes > 1_000_000_000: # 1GB
alert_slack(f"Replication slot {row.slot_name} is inactive with {row.lag_bytes/1e9:.1f}GB lag!")
Publication — Chọn bảng nào để replicate
-- Tạo publication cho các bảng cần CDC
CREATE PUBLICATION debezium_pub FOR TABLE
orders,
order_items,
users,
payments;
-- Hoặc toàn bộ database (cẩn thận với high-churn tables)
CREATE PUBLICATION debezium_pub FOR ALL TABLES;
-- Cấu hình PostgreSQL để enable logical replication
-- postgresql.conf:
wal_level = logical -- Bắt buộc
max_replication_slots = 10 -- Số slots tối đa
max_wal_senders = 10 -- Số concurrent replication connections
Xem WAL events thực tế
-- Test đọc WAL thủ công (dùng pg_logical_slot_get_changes)
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes('debezium_slot', NULL, NULL,
'proto_version', '1', 'publication_names', 'debezium_pub');
-- Output khi INSERT một row:
-- data: {"action":"I","timestamp":"...","schema":"public","table":"orders",
-- "columns":[{"name":"id","value":123},{"name":"status","value":"pending"}]}
-- Output khi DELETE:
-- data: {"action":"D","timestamp":"...","schema":"public","table":"orders",
-- "identity":[{"name":"id","value":123}]}
3. MySQL Binlog Tailing
Binlog format
MySQL ghi changes vào binary log (binlog) — tương tự WAL của PostgreSQL nhưng format khác.
-- Enable binlog với ROW format (bắt buộc cho CDC)
-- my.cnf:
-- [mysqld]
-- log_bin = mysql-bin
-- binlog_format = ROW -- Ghi actual row values, không phải SQL statements
-- binlog_row_image = FULL -- Ghi cả before/after image của row
-- expire_logs_days = 7 -- Giữ binlog 7 ngày
-- Kiểm tra binlog đang bật
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
-- Xem danh sách binlog files
SHOW BINARY LOGS;
/*
Log_name | File_size
mysql-bin.000001 | 177
mysql-bin.000002 | 1073741952
mysql-bin.000003 | 54321
*/
-- Đọc binlog thủ công
-- mysqlbinlog --read-from-remote-server --host=localhost \
-- --start-datetime="2026-04-22 10:00:00" mysql-bin.000003
GTID — Global Transaction ID
MySQL 5.6+ có GTID (Global Transaction ID) — unique identifier cho mỗi transaction. Debezium dùng GTID để track position, không phải file:offset như trước.
-- Enable GTID (recommend cho production CDC)
-- my.cnf:
-- gtid_mode = ON
-- enforce_gtid_consistency = ON
-- Xem GTID đã executed
SHOW VARIABLES LIKE 'gtid_executed';
-- Value: 'a3a55ee2-1234-5678-abcd-ef0123456789:1-1000'
-- Format: server_uuid:transaction_id_range
GTID quan trọng khi failover: Nếu MySQL primary chết và Debezium reconnect vào replica, GTID giúp resume từ đúng position mà không cần restart full snapshot.
4. Debezium — CDC Connector thực tế
Architecture
PostgreSQL/MySQL
│
│ (replication protocol)
▼
┌─────────────────┐
│ Debezium │ ← chạy trong Kafka Connect worker
│ Connector │
└────────┬────────┘
│ (Kafka producer)
▼
Kafka Topics
├── dbserver.public.orders
├── dbserver.public.order_items
└── dbserver.public.users
│
▼
Consumers
(data warehouse, search, cache, other services)
Cấu hình PostgreSQL connector
{
"name": "orders-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "production",
"database.server.name": "dbserver",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"table.include.list": "public.orders,public.order_items,public.users",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"heartbeat.interval.ms": "10000", // Giữ slot active khi không có changes
"snapshot.mode": "initial" // Full snapshot lần đầu, sau đó incremental
}
}
Event format — Trước và sau transform
Raw Debezium event (trước ExtractNewRecordState):
{
"before": {
"id": 123,
"status": "pending",
"amount": 150000,
"updated_at": 1745317800000
},
"after": {
"id": 123,
"status": "completed",
"amount": 150000,
"updated_at": 1745317900000
},
"source": {
"version": "2.4.0",
"connector": "postgresql",
"ts_ms": 1745317900100,
"db": "production",
"schema": "public",
"table": "orders",
"txId": 789012,
"lsn": 27262976
},
"op": "u", // c=create, u=update, d=delete, r=read(snapshot)
"ts_ms": 1745317900200
}
Sau ExtractNewRecordState (thường dùng hơn cho consumers):
{
"id": 123,
"status": "completed",
"amount": 150000,
"updated_at": 1745317900000,
"__op": "u",
"__ts_ms": 1745317900200,
"__source_ts_ms": 1745317900100
}
Giữ before hay không tùy use case:
- Sync sang search index: Chỉ cần
after— update document với state mới - Audit log: Cần cả
beforevàafter— ghi lại "changed from X to Y" - Delete handling:
afterlà null, dùngbefoređể biết cần xóa document nào
Consumer example — Sync vào Elasticsearch
from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(["http://elasticsearch:9200"])
consumer = KafkaConsumer(
"dbserver.public.orders",
bootstrap_servers=["kafka:9092"],
group_id="es-sync-consumer",
value_deserializer=lambda m: json.loads(m.decode()),
auto_offset_reset="earliest",
enable_auto_commit=False, # Manual commit sau khi sync thành công
)
def process_batch(messages: list):
actions = []
for msg in messages:
event = msg.value
op = event.get("__op")
if op in ("c", "u", "r"): # create, update, snapshot read
actions.append({
"_op_type": "index",
"_index": "orders",
"_id": event["id"],
"_source": {
"order_id": event["id"],
"status": event["status"],
"amount": event["amount"],
"updated_at": event["updated_at"],
}
})
elif op == "d": # delete
actions.append({
"_op_type": "delete",
"_index": "orders",
"_id": event["id"],
})
if actions:
helpers.bulk(es, actions)
BATCH_SIZE = 500
batch = []
for message in consumer:
batch.append(message)
if len(batch) >= BATCH_SIZE:
process_batch(batch)
# Commit sau khi ES confirm write thành công
consumer.commit()
batch = []
Snapshot mode — Initial load
Lần đầu chạy Debezium, cần load toàn bộ existing data trước khi switch sang streaming. Đây là initial snapshot.
Phase 1 — Snapshot:
Debezium đọc toàn bộ tables bằng SELECT
Emit tất cả rows với op="r" (read)
Mark snapshot complete
Phase 2 — Streaming:
Switch sang đọc WAL/binlog từ LSN tại thời điểm snapshot bắt đầu
Replay bất kỳ changes nào xảy ra trong lúc snapshot
Continue streaming
Snapshot của bảng hàng triệu rows có thể mất hàng giờ và tạo load lớn trên DB. Tune bằng:
{
"snapshot.fetch.size": "1000", // Rows per fetch
"snapshot.max.threads": "1", // Tránh parallel snapshot với DB nhỏ
"snapshot.select.statement.overrides": "public.orders:SELECT * FROM public.orders WHERE created_at > '2025-01-01'"
// Snapshot chỉ lấy data gần đây, không cần history
}
5. Exactly-Once Delivery trong CDC Pipelines
Ba delivery semantics
At-most-once: Message có thể bị mất, không bao giờ duplicate
At-least-once: Message không bị mất, nhưng có thể duplicate
Exactly-once: Message delivered đúng một lần, không mất, không duplicate
Kafka broker + Debezium mặc định là at-least-once. Connector crash và restart → có thể replay một số messages.
Tại sao exactly-once khó
Debezium đọc WAL event:
LSN 1000: INSERT order id=123
LSN 1001: UPDATE order id=123 status=completed
Debezium gửi cả hai sang Kafka.
Kafka confirm nhận LSN 1000.
Trước khi Debezium commit offset LSN 1001, connector crash.
Restart: Debezium đọc lại từ LSN 1000 (chưa được commit).
→ Kafka nhận duplicate: INSERT order id=123 lần thứ 2.
Approach 1: Idempotent consumers (phổ biến nhất)
Không cố gắng prevent duplicates ở producer — design consumer để tolerate duplicates.
def upsert_to_warehouse(event: dict):
order_id = event["id"]
lsn = event.get("__lsn") # Debezium expose LSN trong source field
# Upsert: nếu cùng order_id và LSN nhỏ hơn hoặc bằng đã có → skip
warehouse.execute("""
INSERT INTO orders (id, status, amount, _cdc_lsn, _cdc_ts)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
amount = EXCLUDED.amount,
_cdc_lsn = EXCLUDED._cdc_lsn,
_cdc_ts = EXCLUDED._cdc_ts
WHERE orders._cdc_lsn < EXCLUDED._cdc_lsn -- Chỉ update nếu event mới hơn
""", (order_id, event["status"], event["amount"], lsn, event["__ts_ms"]))
ON CONFLICT ... WHERE _cdc_lsn < EXCLUDED._cdc_lsn là idempotency key — cùng LSN replay thì no-op.
Approach 2: Kafka Transactions + Exactly-Once Semantics
Kafka 0.11+ support transactions giữa producer và consumer. Debezium 1.9+ support exactly-once khi dùng với Kafka transactions.
{
"exactly.once.support": "required",
"producer.override.enable.idempotence": "true",
"producer.override.isolation.level": "read_committed",
"producer.override.acks": "all"
}
Cách hoạt động:
- Debezium mở Kafka transaction
- Gửi tất cả events trong một DB transaction vào Kafka
- Commit Kafka transaction và update Kafka offset atomically
- Nếu crash xảy ra, Kafka transaction bị rollback → không có partial write
Limitation: Chỉ guarantee exactly-once Kafka → Kafka. Downstream consumer (ES, warehouse) vẫn cần idempotency.
Approach 3: Deduplication với Redis
Khi consumer là external system không support upsert:
import hashlib
def deduplicated_consumer(event: dict, process_fn):
# Tạo unique ID cho event
event_id = f"{event['__source_ts_ms']}:{event['id']}:{event['__op']}"
dedup_key = f"cdc:seen:{hashlib.md5(event_id.encode()).hexdigest()}"
# Set với NX (only if not exists) và TTL 24h
is_new = redis.set(dedup_key, "1", nx=True, ex=86400)
if is_new:
process_fn(event)
# else: duplicate, skip
# Dùng:
for message in consumer:
deduplicated_consumer(message.value, lambda e: send_to_webhook(e))
6. Schema Evolution — Thay đổi database mà không vỡ pipeline
Vấn đề: Schema incompatibility
Tháng 1: Table orders có columns (id, status, amount)
→ Debezium emit Avro events với schema v1
→ Warehouse table có (id, status, amount)
Tháng 3: DBA thêm column discount_code VARCHAR(50)
→ Debezium emit events với schema v2 (thêm discount_code)
→ Warehouse INSERT fail: "column discount_code does not exist"
→ Pipeline vỡ lúc deploy migration
Safe schema changes với Avro + Schema Registry
Schema Registry enforce compatibility rules. Với BACKWARD compatibility (default):
Allowed changes (backward compatible):
-- Safe: Thêm column với DEFAULT
ALTER TABLE orders ADD COLUMN discount_code VARCHAR(50) DEFAULT NULL;
-- Safe: Tăng độ dài VARCHAR
ALTER TABLE orders ALTER COLUMN description TYPE TEXT;
Avro: Thêm field với default value → schema mới tương thích ngược. Consumer cũ không biết field mới, nhận NULL/default.
Breaking changes:
-- BREAKING: Đổi tên column
ALTER TABLE orders RENAME COLUMN amount TO amount_cents;
-- BREAKING: Đổi type
ALTER TABLE orders ALTER COLUMN user_id TYPE UUID USING user_id::UUID;
-- BREAKING: Xóa column
ALTER TABLE orders DROP COLUMN legacy_field;
Khi xóa hoặc đổi tên column, Avro schema không còn backward compatible → Schema Registry reject → Debezium connector fail.
Zero-downtime schema migration với CDC
Pattern: Expand-Contract (Blue-Green columns)
-- Phase 1 EXPAND: Thêm column mới, giữ column cũ
ALTER TABLE orders ADD COLUMN user_uuid UUID DEFAULT NULL;
-- Backfill background (không lock table)
UPDATE orders SET user_uuid = user_id::UUID
WHERE user_uuid IS NULL
LIMIT 1000; -- Chạy theo batch, không block
-- Tại đây: cả user_id và user_uuid đều tồn tại
-- Application write vào cả hai, CDC emit cả hai
-- Downstream consumers migrate dần sang user_uuid
-- Phase 2 CONTRACT: Sau khi tất cả consumers đã migrate
-- Xóa column cũ
ALTER TABLE orders DROP COLUMN user_id;
Pattern: Versioned topics
Thay vì migrate in-place, emit sang topic mới:
- dbserver.public.orders.v1 (consumers cũ vẫn đọc)
- dbserver.public.orders.v2 (consumers mới đọc schema mới)
Debezium có thể emit đồng thời vào cả hai topic trong migration period.
Sau khi tất cả consumers migrate sang v2, xóa v1.
Xử lý DDL events trong consumer
Debezium emit DDL changes (schema changes) như một loại special event:
def handle_cdc_event(event: dict):
# DDL event khi schema thay đổi
if event.get("__op") == "schema_change":
ddl = event.get("ddl")
print(f"Schema changed: {ddl}")
# Tự động apply schema change vào warehouse
if "ADD COLUMN" in ddl.upper():
apply_warehouse_migration(ddl)
elif "DROP COLUMN" in ddl.upper():
# Cẩn thận: không drop ngay, chờ verify consumers sẵn sàng
flag_for_review(ddl)
return
# Normal data event
process_data_event(event)
def apply_warehouse_migration(ddl: str):
# Parse DDL và apply tương đương cho warehouse
# Ví dụ: "ALTER TABLE orders ADD COLUMN discount_code VARCHAR(50)"
# → BigQuery: bq query "ALTER TABLE orders ADD COLUMN discount_code STRING"
warehouse.execute(translate_ddl(ddl))
7. Outbox Pattern — CDC + Guaranteed Delivery
Vấn đề: Đảm bảo event delivery mà không cần 2PC
Scenario phổ biến:
def complete_order(order_id: str):
with db.transaction():
db.execute("UPDATE orders SET status = 'completed' WHERE id = %s", order_id)
# Transaction commit ở đây
# Nếu crash xảy ra ở đây:
kafka.produce("order.completed", {"order_id": order_id}) # Event bị mất!
Để fix bằng 2-phase commit (XA transaction spanning DB + Kafka) thì phức tạp, chậm, và hầu hết message brokers không support đầy đủ.
Outbox pattern giải quyết bằng CDC
Ý tưởng: Thay vì publish trực tiếp vào Kafka, write event vào một bảng trong cùng database transaction. CDC sẽ pick up và forward sang Kafka.
-- Tạo outbox table
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- 'order', 'user', 'payment'
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ -- NULL = chưa processed (optional)
);
CREATE INDEX idx_outbox_unprocessed ON outbox_events(created_at)
WHERE processed_at IS NULL;
def complete_order(order_id: str):
with db.transaction():
# Business logic
db.execute(
"UPDATE orders SET status = 'completed', updated_at = NOW() WHERE id = %s",
order_id
)
# Write event vào outbox — trong cùng transaction!
db.execute("""
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (%s, %s, %s, %s)
""", (
"order",
order_id,
"order.completed",
json.dumps({
"order_id": order_id,
"completed_at": datetime.utcnow().isoformat(),
"status": "completed"
})
))
# Nếu transaction rollback → cả order update lẫn outbox event đều bị rollback
# Guaranteed atomicity!
PostgreSQL Transaction commits
→ WAL ghi: UPDATE orders + INSERT outbox_events
→ Debezium đọc outbox_events change
→ Route sang Kafka topic dựa trên aggregate_type/event_type
→ Consumer nhận "order.completed" event
Debezium Outbox Event Router
Debezium có sẵn OutboxEventRouter transform để tự động route từ outbox table sang đúng Kafka topic:
{
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}"
}
outbox_events row:
aggregate_type = "order"
aggregate_id = "ord_123"
event_type = "order.completed"
payload = {...}
→ Kafka topic: "outbox.order"
Key: "ord_123"
Value: payload
Cleanup outbox table
Outbox table tăng kích thước nếu không cleanup:
-- Option 1: Soft delete — đánh dấu processed
-- (chạy trong CDC consumer sau khi forward sang Kafka)
UPDATE outbox_events SET processed_at = NOW()
WHERE id = %s;
-- Cleanup job: xóa events đã processed sau 7 ngày
DELETE FROM outbox_events
WHERE processed_at IS NOT NULL
AND processed_at < NOW() - INTERVAL '7 days';
-- Option 2: Hard delete sau khi Debezium confirm
-- Debezium vẫn thấy DELETE event và forward tombstone sang Kafka
-- Consumer nhận tombstone → biết là processed, không cần action gì
DELETE FROM outbox_events
WHERE created_at < NOW() - INTERVAL '1 hour'
AND processed_at IS NOT NULL;
Option 2 tốt hơn vì không giữ data lâu, nhưng cần đảm bảo Debezium đã replicate row trước khi xóa. Dùng TTL 1 giờ là safe với hầu hết CDC setups.
Outbox vs Direct CDC — Khi nào dùng cái nào
Direct CDC (Debezium đọc directly từ application tables):
- Đơn giản hơn, không cần thêm outbox table
- Events là reflection của DB state — downstream nhận raw DB row
- Không control được payload structure
Outbox pattern:
- Events được explicit designed: bạn control payload, naming, versioning
- Transactional guarantee: event và business data in sync 100%
- Dễ schema evolve: thay đổi payload không ảnh hưởng DB schema
- Thêm overhead: thêm bảng, thêm write per transaction
Rule of thumb: Nếu consumer chỉ cần sync DB state (search index, cache, warehouse) → Direct CDC. Nếu consumer cần business events với curated payload (downstream microservices, external webhooks) → Outbox pattern.
8. CDC vào Data Warehouse — End-to-end pipeline
Full pipeline: PostgreSQL → Kafka → S3 → Iceberg
PostgreSQL orders table
│
│ (Debezium, logical replication)
▼
Kafka topic: dbserver.public.orders
│
│ (Kafka Connect S3 Sink, Parquet format)
▼
S3: s3://data-lake/raw/orders/year=.../month=.../day=.../
│
│ (Apache Spark / Trino)
▼
Iceberg table: raw.orders (append-only, full CDC history)
│
│ (dbt model)
▼
Iceberg table: marts.current_orders (latest state per order_id)
Merge CDC events thành current state
Raw CDC table chứa tất cả events (inserts, updates, deletes). Để analytics cần current state — chỉ giá trị mới nhất của mỗi row.
-- dbt model: marts/current_orders.sql
-- Merge CDC history thành current state
WITH cdc_events AS (
SELECT
id AS order_id,
status,
amount,
user_id,
__op AS operation,
__ts_ms AS event_ts,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY __ts_ms DESC, __lsn DESC -- Lấy event mới nhất per order
) AS rn
FROM {{ source('raw', 'orders') }}
)
SELECT
order_id,
status,
amount,
user_id,
event_ts AS last_updated_at
FROM cdc_events
WHERE rn = 1
AND operation != 'd' -- Loại bỏ deleted rows
Iceberg MERGE — Efficient upsert
Với Iceberg, có thể dùng MERGE thay vì ROW_NUMBER trick:
-- Incremental merge với Iceberg MERGE INTO
MERGE INTO marts.current_orders AS target
USING (
-- Source: CDC events trong batch mới nhất
SELECT id, status, amount, user_id, __op, __ts_ms
FROM raw.orders
WHERE _ingested_at > (SELECT MAX(_ingested_at) FROM marts.current_orders)
) AS source
ON target.order_id = source.id
WHEN MATCHED AND source.__op = 'd' THEN DELETE
WHEN MATCHED AND source.__op IN ('u', 'r') THEN UPDATE SET
status = source.status,
amount = source.amount,
last_updated = source.__ts_ms
WHEN NOT MATCHED AND source.__op != 'd' THEN INSERT
(order_id, status, amount, user_id, last_updated)
VALUES (source.id, source.status, source.amount, source.user_id, source.__ts_ms)
;
MERGE INTO Iceberg efficient hơn "DELETE + INSERT" hay "ROW_NUMBER scan" vì Iceberg chỉ rewrite các files chứa affected rows, không full table scan.
Monitoring CDC pipeline health
Các metrics cần alert:
# Prometheus metrics cho CDC pipeline health
from prometheus_client import Gauge, Counter
cdc_lag_seconds = Gauge(
'cdc_pipeline_lag_seconds',
'Lag between DB change and Kafka message',
['connector', 'table']
)
cdc_events_total = Counter(
'cdc_events_processed_total',
'Total CDC events processed',
['table', 'operation']
)
def update_cdc_metrics(event: dict):
table = event.get("__source_table", "unknown")
op = event.get("__op", "unknown")
# Lag: thời gian từ khi DB change đến khi consumer nhận
db_ts = event.get("__source_ts_ms", 0) / 1000
now = time.time()
lag = now - db_ts
cdc_lag_seconds.labels(
connector="debezium",
table=table
).set(lag)
cdc_events_total.labels(table=table, operation=op).inc()
# Alertmanager rules
groups:
- name: cdc_alerts
rules:
- alert: CDCHighLag
expr: cdc_pipeline_lag_seconds > 60
for: 5m
annotations:
summary: "CDC lag > 60s for {{ $labels.table }}"
- alert: CDCConnectorDown
expr: absent(cdc_pipeline_lag_seconds) == 1
for: 2m
annotations:
summary: "CDC connector appears to be down (no metrics)"
- alert: ReplicationSlotGrowing
expr: pg_replication_slots_pg_wal_lsn_diff_bytes > 500000000
annotations:
summary: "Replication slot lag > 500MB, risk of disk full"
Tổng kết: Mental Model cho CDC
CDC không phải là technology — đây là pattern giải quyết bài toán: "Làm sao giữ multiple systems consistent với source of truth mà không tạo tight coupling?"
Câu hỏi để chọn approach
─────────────────────────────────────────────────────
Cần sync database state hay emit business events?
├── DB state (search index, cache, warehouse)
│ → Direct CDC với Debezium
└── Business events (microservices, webhooks)
→ Outbox pattern + Debezium
Cần guarantee gì?
├── At-least-once là OK (idempotent consumer)
│ → Default Debezium setup
└── Exactly-once required
→ Kafka transactions + idempotent upsert
Schema thay đổi thường xuyên không?
├── Ít thay đổi
│ → Schema Registry với BACKWARD compat
└── Thay đổi liên tục
→ Expand-contract pattern + versioned topics
Database là gì?
├── PostgreSQL
│ → Logical replication + pgoutput plugin
│ → Monitor replication slot lag!
└── MySQL
→ Binlog tailing
→ Enable GTID cho reliable failover
CDC đơn giản hóa kiến trúc đáng kể khi implement đúng — thay vì mỗi service biết về tất cả downstream consumers, database là single source of truth và CDC là broadcast mechanism. Nhưng giống mọi distributed system pattern, failure modes cần được design trước, không phải sau.