✍️ Khoa📅 22/04/2026☕ 23 phút đọc

Data Platform Architecture — Modern Data Stack Cho Backend Engineer

Data platform là infrastructure mà mọi analytics, reporting, và ML feature của product đều depend vào. Backend engineer không cần build nó — nhưng cần hiểu đủ để: review kiến trúc, avoid những quyết định microservice làm data pipeline vỡ, và partner hiệu quả với data team.

Bài này map modern data stack: data lakehouse pattern (Delta Lake, Apache Iceberg — tại sao thay thế data warehouse truyền thống), dbt như transformation layer, orchestration (Airflow vs Prefect vs Dagster — trade-offs thực tế), data contract giữa producer và consumer, reverse ETL, và real-time OLAP engines (ClickHouse, Apache Druid, Apache Pinot) cho trường hợp cần analytics với latency milliseconds.


Mục lục

  1. Tại sao Data Lakehouse thay thế Data Warehouse truyền thống
  2. Apache Iceberg — Table Format hiện đại
  3. dbt — Transformation Layer
  4. Orchestration — Airflow vs Prefect vs Dagster
  5. Data Contract — Ranh giới giữa Producer và Consumer
  6. Reverse ETL — Đưa Data Trở Lại Operational Systems
  7. Real-time OLAP — ClickHouse, Druid, Pinot
  8. Kiến trúc tổng thể — Putting it all together

1. Tại sao Data Lakehouse thay thế Data Warehouse truyền thống

Ba thế hệ kiến trúc

Thế hệ 1 — Data Warehouse (Redshift, BigQuery, Snowflake):

Operational DBs → ETL jobs → Data Warehouse → BI Tools
(PostgreSQL, MySQL)              (Redshift)     (Tableau)

Warehouse lưu structured data trong proprietary format. Query fast. Nhưng:

  • Data phải được ETL vào trước khi dùng được — lag từ vài giờ đến vài ngày
  • Storing raw/unstructured data (logs, JSON, images) thì đắt hoặc không support
  • ML training cần raw data, không phải aggregated tables → phải maintain copy thứ hai
  • Vendor lock-in: format của Redshift không đọc được bởi Snowflake

Thế hệ 2 — Data Lake (S3 + Hive):

Operational DBs → Raw files on S3 → Spark jobs → Analytics
                  (Parquet, CSV, JSON)

Cheap storage, flexible format. Nhưng:

  • Không có ACID transactions → concurrent writes corrupt data
  • Không có schema enforcement → "data swamp": không ai biết data có format gì
  • Query performance kém: phải scan toàn bộ file để filter
  • Không có time travel, không có versioning

Thế hệ 3 — Data Lakehouse:

Operational DBs → Object Storage (S3/GCS) + Open Table Format → Query Engines
                  (Parquet files + Iceberg/Delta metadata)      (Spark, Trino, DuckDB)

Lấy điểm tốt của cả hai: cheap storage của data lake + ACID transactions và query performance của warehouse. Format mở, không bị lock vào vendor.

Tại sao điều này quan trọng với backend engineer

Khi bạn thiết kế microservice, quyết định schema của events bạn emit sẽ ảnh hưởng trực tiếp đến data platform:

# Event này dễ ingest vào lakehouse
{
    "event_type": "order.completed",
    "order_id": "ord_123",
    "user_id": "usr_456",
    "amount": 150000,
    "currency": "VND",
    "items": [
        {"product_id": "p1", "quantity": 2, "price": 50000},
        {"product_id": "p2", "quantity": 1, "price": 50000}
    ],
    "created_at": "2026-04-22T10:30:00Z"  # ← ISO 8601, không phải Unix timestamp tùy tiện
}

# Event này gây khó dễ cho data team
{
    "t": "oc",           # Abbreviation không ai hiểu
    "d": 1745317800,     # Unix timestamp — timezone nào?
    "meta": "{\"a\":1}", # JSON trong JSON string — không thể query
    "v": 2               # Version field nhưng không document
}

2. Apache Iceberg — Table Format hiện đại

Iceberg là gì và không phải là gì

Iceberg không phải query engine (không phải Spark, không phải Trino). Iceberg là table format — một spec cho cách organize metadata và data files để bất kỳ engine nào cũng đọc được.

Tương tự: Parquet là file format cho một file. Iceberg là table format cho một tập hợp nhiều Parquet files.

S3 bucket/
├── data/
│   ├── year=2026/month=04/day=22/
│   │   ├── 00001.parquet   (actual data)
│   │   └── 00002.parquet
│   └── year=2026/month=04/day=21/
│       └── 00001.parquet
└── metadata/
    ├── v1.metadata.json    (table schema, partitioning spec)
    ├── v2.metadata.json    (sau khi schema evolve)
    ├── snap-001.avro       (snapshot: danh sách files tại một thời điểm)
    └── snap-002.avro       (snapshot mới sau khi write)

ACID Transactions trong object storage

Object storage (S3) không có row-level locking. Iceberg giải quyết bằng optimistic concurrency control:

Writer A muốn thêm data:
1. Đọc current metadata pointer: metadata/v2.metadata.json
2. Write new data files: data/.../00003.parquet
3. Write new snapshot: metadata/snap-003.avro
4. Atomic swap: metadata pointer từ v2 → v3

Writer B (concurrent) cũng muốn thêm data:
1. Đọc current metadata pointer: metadata/v2.metadata.json (cùng lúc với A)
2. Write new data files: data/.../00004.parquet
3. Write new snapshot: metadata/snap-004.avro (dựa trên v2)
4. Atomic swap: metadata/v2 → v3 ← FAIL! v3 đã tồn tại (Writer A đã swap)
5. Retry: đọc lại v3, rebase snapshot, thử lại

Atomic swap implement qua S3's conditional writes hoặc external catalog (AWS Glue, Hive Metastore, Nessie).

Time Travel — Feature backend engineer hay cần

-- Query data tại một thời điểm trong quá khứ
SELECT * FROM orders
FOR SYSTEM_TIME AS OF '2026-04-01 00:00:00'
WHERE user_id = 'usr_456';

-- Query theo snapshot ID cụ thể
SELECT * FROM orders VERSION AS OF 42;

-- Xem lịch sử snapshot
SELECT * FROM orders.history;
/*
snapshot_id | committed_at            | operation
42          | 2026-04-01 00:00:00 UTC | append
43          | 2026-04-02 00:00:00 UTC | overwrite
44          | 2026-04-03 00:00:00 UTC | delete
*/

Dùng cho:

  • Debug: "Tại sao report ngày hôm qua khác hôm nay?" → query lại snapshot cũ
  • Audit: "Data lúc 9am trông như thế nào trước khi pipeline chạy lúc 10am?"
  • Rollback: Revert về snapshot trước khi bad pipeline write corrupt data

Schema Evolution — Thêm column mà không cần rewrite

# Thêm column mới → chỉ update metadata, không touch Parquet files
spark.sql("""
    ALTER TABLE orders 
    ADD COLUMN discount_amount DECIMAL(10,2)
""")

# Đổi tên column (Iceberg track column by ID, không phải name)
spark.sql("""
    ALTER TABLE orders 
    RENAME COLUMN amount TO gross_amount
""")

# Old files: không có discount_amount → Iceberg trả về NULL tự động
# Old files: column tên "amount" → Iceberg map sang "gross_amount" qua column ID

Delta Lake vs Iceberg:

Feature Delta Lake Apache Iceberg
Tác giả Databricks Netflix, Apple
Vendor Databricks ecosystem Vendor neutral
Query engines Spark native, tốt nhất Spark, Trino, Flink, DuckDB...
Catalog Delta Log Pluggable (Glue, Hive, Nessie, REST)
Hidden partitioning Không Có (tránh partition hell)
Streaming Structured Streaming Flink, Spark Streaming

Nếu stack của bạn heavy Databricks → Delta Lake. Nếu multi-engine hoặc cloud-agnostic → Iceberg thường là lựa chọn tốt hơn về long-term.


3. dbt — Transformation Layer

dbt làm gì

dbt (data build tool) là framework để transform data bên trong warehouse/lakehouse bằng SQL, với version control, testing, và documentation built-in. Nó không move data — chỉ transform data đã có trong warehouse.

Raw data (S3/warehouse) → [dbt models] → Transformed tables → BI/Analytics

Trước dbt, transformation thường là stored procedures hoặc custom Python scripts — không testable, không version-controlled, không có lineage.

Model hierarchy — Cách tổ chức logic

models/
├── staging/          # 1-1 với source tables, minimal transform
│   ├── stg_orders.sql
│   ├── stg_users.sql
│   └── stg_products.sql
├── intermediate/     # Business logic, joins
│   ├── int_order_items_enriched.sql
│   └── int_user_lifetime_value.sql
└── marts/            # Final tables cho BI/product teams
    ├── finance/
    │   └── fct_daily_revenue.sql
    └── product/
        └── dim_user_segments.sql

Staging model — raw → clean:

-- models/staging/stg_orders.sql
-- Rename columns, cast types, no business logic

SELECT
    id                                          AS order_id,
    user_id,
    CAST(created_at AS TIMESTAMP)               AS created_at,
    UPPER(status)                               AS status,
    CAST(amount AS DECIMAL(15,2)) / 100         AS amount_vnd,  -- cents → VND
    currency
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2024-01-01'  -- Partition pruning

Fact model — business logic:

-- models/marts/finance/fct_daily_revenue.sql
-- Join, aggregate, business rules

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    WHERE status = 'COMPLETED'
),

refunds AS (
    SELECT * FROM {{ ref('stg_refunds') }}
),

daily_metrics AS (
    SELECT
        DATE_TRUNC('day', o.created_at)     AS date,
        COUNT(DISTINCT o.order_id)          AS total_orders,
        SUM(o.amount_vnd)                   AS gross_revenue,
        COALESCE(SUM(r.amount_vnd), 0)      AS total_refunds,
        SUM(o.amount_vnd) - COALESCE(SUM(r.amount_vnd), 0) AS net_revenue
    FROM orders o
    LEFT JOIN refunds r ON o.order_id = r.order_id
    GROUP BY 1
)

SELECT * FROM daily_metrics

{{ ref('stg_orders') }} là dbt macro — tự resolve thành table name đúng environment và track dependency để build DAG đúng thứ tự.

Testing — Đây là phần backend engineer sẽ thấy familiar

# models/staging/schema.yml
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique           # Không có duplicate order_id
          - not_null         # order_id phải luôn có giá trị
      - name: status
        tests:
          - accepted_values:
              values: ['PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED']
      - name: amount_vnd
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"  # Không có negative amount
dbt test --select stg_orders
# Running 4 tests...
# PASS unique_stg_orders_order_id
# PASS not_null_stg_orders_order_id
# FAIL accepted_values_stg_orders_status  ← tìm thấy status = 'FAILED' không expect
# PASS not_null_stg_orders_amount_vnd

Incremental models — Không reprocess toàn bộ history mỗi lần

-- models/marts/fct_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        on_schema_change='append_new_columns'
    )
}}

SELECT
    event_id,
    user_id,
    event_type,
    created_at,
    properties
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
    -- Chỉ process data mới hơn max timestamp hiện có trong table
    WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}

Lần đầu chạy: full scan toàn bộ bảng. Các lần sau: chỉ scan data mới. Với bảng events hàng tỷ rows, đây là difference giữa 10 phút và 2 giờ per run.

dbt trong CI/CD pipeline

# .github/workflows/dbt.yml
on:
  pull_request:

jobs:
  dbt-test:
    steps:
      - name: Run dbt on staging schema
        run: |
          dbt run --target staging --select state:modified+  # Chỉ chạy models bị thay đổi và downstream
          dbt test --target staging --select state:modified+

      - name: Generate docs
        run: dbt docs generate

state:modified+ là killer feature: chỉ rebuild models bị thay đổi và tất cả models downstream — tránh phải chạy lại toàn bộ DAG cho mỗi PR.


4. Orchestration — Airflow vs Prefect vs Dagster

Vấn đề orchestration giải quyết

Data pipelines có dependencies phức tạp:

Raw events (Kafka) 
    → CDC from orders DB
    → Hourly: ingest vào S3
    → Sau ingest xong: chạy dbt staging models
    → Sau staging: chạy dbt fact models
    → Sau facts: refresh dashboard cache
    → Sau dashboard: gửi email daily report lúc 8am
    → Nếu bất kỳ step nào fail: alert Slack + retry 3 lần

Orchestrator là tool manage DAG này — scheduling, retry, monitoring, alerting.

Apache Airflow

Tool phổ biến nhất. Mature, huge community, nhiều operators có sẵn.

# dags/daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
}

with DAG(
    dag_id='daily_revenue_pipeline',
    default_args=default_args,
    schedule_interval='0 1 * * *',  # 1am daily
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    ingest_orders = PythonOperator(
        task_id='ingest_orders_to_s3',
        python_callable=ingest_orders_from_db,
    )

    run_dbt_staging = DbtCloudRunJobOperator(
        task_id='dbt_staging',
        job_id=12345,
        wait_for_termination=True,
    )

    run_dbt_marts = DbtCloudRunJobOperator(
        task_id='dbt_marts',
        job_id=12346,
        wait_for_termination=True,
    )

    refresh_dashboard = PythonOperator(
        task_id='refresh_dashboard_cache',
        python_callable=refresh_metabase_cache,
    )

    # Dependencies
    ingest_orders >> run_dbt_staging >> run_dbt_marts >> refresh_dashboard

Airflow pain points:

  • DAG definitions là Python code nhưng chạy như config — không test được locally dễ dàng
  • Scheduler bottleneck khi có hàng nghìn DAGs
  • Dynamic DAGs (DAG tạo tasks dựa trên runtime data) rất verbose
  • UI tốt nhưng debugging pipeline failures đòi hỏi navigate nhiều clicks
  • Local development awkward: phải chạy full Airflow stack

Prefect

Prefect viết lại từ đầu để fix Airflow pain points. Code-first approach.

# flows/daily_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=300,
    cache_key_fn=task_input_hash,  # Cache result, skip nếu input không đổi
    cache_expiration=timedelta(hours=24),
)
def ingest_orders(date: str) -> int:
    rows = fetch_orders_from_db(date)
    upload_to_s3(rows, date)
    return len(rows)

@task(log_prints=True)
def run_dbt_models(models: list[str]):
    result = subprocess.run(
        ["dbt", "run", "--select"] + models,
        capture_output=True
    )
    if result.returncode != 0:
        raise Exception(f"dbt failed: {result.stderr}")

@flow(name="Daily Revenue Pipeline")
def daily_pipeline(date: str = None):
    if not date:
        date = datetime.today().strftime('%Y-%m-%d')

    rows_ingested = ingest_orders(date)
    print(f"Ingested {rows_ingested} rows")

    run_dbt_models(["stg_orders", "stg_users"])
    run_dbt_models(["fct_daily_revenue"])  # Runs after staging completes

if __name__ == "__main__":
    daily_pipeline()  # ← Chạy được trực tiếp locally, không cần Airflow server

Prefect advantages over Airflow:

  • Local development trivial: python flows/daily_pipeline.py
  • Dynamic workflows tự nhiên hơn vì đây chỉ là Python code
  • Prefect Cloud UI tốt hơn Airflow UI cho debugging
  • Deployment model đơn giản hơn

Prefect drawbacks:

  • Ecosystem operators nhỏ hơn Airflow
  • Prefect Cloud có cost; self-hosted Prefect Server thiếu một số features

Dagster

Dagster tiếp cận khác: asset-centric thay vì task-centric. Thay vì nghĩ "chạy task X rồi task Y", Dagster nghĩ "produce asset A, asset A depends on asset B".

# assets/pipeline.py
from dagster import asset, AssetIn, define_asset_job, ScheduleDefinition
import pandas as pd

@asset(
    description="Raw orders ingested from PostgreSQL",
    group_name="ingestion"
)
def raw_orders() -> pd.DataFrame:
    return fetch_orders_from_db()

@asset(
    ins={"raw_orders": AssetIn()},
    description="Cleaned and typed orders",
    group_name="staging"
)
def stg_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    return (
        raw_orders
        .rename(columns={"id": "order_id"})
        .assign(amount_vnd=lambda df: df["amount"] / 100)
        .query("status.notna()")
    )

@asset(
    ins={"stg_orders": AssetIn()},
    description="Daily revenue aggregated",
    group_name="marts"
)
def fct_daily_revenue(stg_orders: pd.DataFrame) -> pd.DataFrame:
    return (
        stg_orders
        .query("status == 'COMPLETED'")
        .groupby(stg_orders["created_at"].dt.date)
        .agg(net_revenue=("amount_vnd", "sum"), total_orders=("order_id", "count"))
        .reset_index()
    )

# Schedule
daily_job = define_asset_job("daily_pipeline", selection=["fct_daily_revenue"])
daily_schedule = ScheduleDefinition(job=daily_job, cron_schedule="0 1 * * *")

Dagster killer feature: Asset lineage visualization tự động. Bạn click vào fct_daily_revenue, thấy ngay nó depends vào gì, ai produce nó, khi nào nó last materialized, và freshness policy.

Trade-off summary:

Airflow Prefect Dagster
Paradigm Task-centric Task-centric Asset-centric
Learning curve Cao Thấp Trung bình
Local dev Khó Dễ Trung bình
Dynamic pipelines Verbose Tự nhiên Tự nhiên
Observability Cơ bản Tốt Xuất sắc
Ecosystem Rất lớn Lớn Trung bình
Khi nào chọn Đã có, team biết Airflow Greenfield, muốn đơn giản Asset lineage quan trọng, ML pipelines

5. Data Contract — Ranh giới giữa Producer và Consumer

Vấn đề không có data contract

Microservice team (Producer)        Data team (Consumer)
─────────────────────────────       ─────────────────────
Tháng 1: emit event với              Build pipeline dựa trên
  field "amount" (integer, cents)    field "amount"

Tháng 3: Đổi "amount" sang          
  "amount_vnd" + "amount_usd"        Pipeline vỡ lúc 2am
  (Ticket không mention data team)   On-call bị wake up

Tháng 5: "user_id" từ integer        Report sai 3 ngày
  đổi sang UUID string               trước khi ai phát hiện

Data contract là explicit agreement giữa producer và consumer về: schema của data, semantics của fields, SLA về freshness và completeness, và ai chịu trách nhiệm khi contract bị vi phạm.

Schema Registry — Enforcement layer

Kafka + Confluent Schema Registry là cách phổ biến nhất để enforce schema contract:

# Producer side — phải đăng ký schema trước khi publish
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka import Producer

schema_registry_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})

ORDER_SCHEMA = """
{
    "type": "record",
    "name": "OrderCompleted",
    "namespace": "com.company.orders",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "amount_vnd", "type": "long"},
        {"name": "currency", "type": "string", "default": "VND"},
        {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"},
        {"name": "items", "type": {
            "type": "array",
            "items": {
                "type": "record",
                "name": "OrderItem",
                "fields": [
                    {"name": "product_id", "type": "string"},
                    {"name": "quantity", "type": "int"},
                    {"name": "unit_price_vnd", "type": "long"}
                ]
            }
        }}
    ]
}
"""

avro_serializer = AvroSerializer(
    schema_registry_client,
    ORDER_SCHEMA,
    lambda obj, ctx: obj  # to_dict function
)

producer = Producer({"bootstrap.servers": "kafka:9092"})

# Publish — schema được validate và versioned tự động
def emit_order_completed(order: dict):
    producer.produce(
        topic="orders.completed",
        key=order["order_id"],
        value=avro_serializer(order, SerializationContext("orders.completed", MessageField.VALUE))
    )

Schema Registry enforce compatibility rules. Ví dụ với BACKWARD compatibility:

Allowed (backward compatible):
✓ Thêm field mới với default value
✓ Xóa field có default value

Not allowed (breaking change):
✗ Đổi type của field (integer → string)
✗ Đổi tên field (amount → amount_vnd)
✗ Xóa field không có default
✗ Thêm field bắt buộc không có default
# Khi developer push schema change không compatible
curl -X POST http://schema-registry/compatibility/subjects/orders.completed-value/versions/latest \
  -d '{"schema": "..."}'

# Response:
{
    "is_compatible": false,
    "messages": ["Removing field amount without default is not backward compatible"]
}

Data Contract as Code — Beyond schema

Schema chỉ là một phần. Data contract đầy đủ còn cover:

# contracts/orders-completed.yaml
name: orders.completed
version: "2.1.0"
owner: payments-team
consumers:
  - data-platform-team
  - fraud-detection-team

schema:
  type: avro
  registry: http://schema-registry:8081
  subject: orders.completed-value

sla:
  freshness_minutes: 5      # Data phải available trong 5 phút sau khi order complete
  completeness_pct: 99.9    # 99.9% events phải arrive (không bị drop)
  throughput_per_second: 1000

semantics:
  amount_vnd:
    description: "Gross order amount in Vietnamese Dong. Always positive."
    not_null: true
    minimum: 1000
    maximum: 100000000
  status:
    allowed_values: [PENDING, COMPLETED, CANCELLED, REFUNDED]
    note: "FAILED status was deprecated in v2.0, use CANCELLED instead"

breaking_change_process:
  notice_period_days: 14
  approval_required: [data-platform-team, fraud-detection-team]
  migration_guide_required: true
# Validate contract tự động trong CI
import yaml
from dataclasses import dataclass

def validate_contract(event: dict, contract_path: str) -> list[str]:
    with open(contract_path) as f:
        contract = yaml.safe_load(f)

    violations = []
    semantics = contract.get("semantics", {})

    for field, rules in semantics.items():
        value = event.get(field)

        if rules.get("not_null") and value is None:
            violations.append(f"{field}: must not be null")
            continue

        if "minimum" in rules and value < rules["minimum"]:
            violations.append(f"{field}: {value} < minimum {rules['minimum']}")

        if "allowed_values" in rules and value not in rules["allowed_values"]:
            violations.append(f"{field}: '{value}' not in allowed values {rules['allowed_values']}")

    return violations

Consumer-driven contract testing

Pattern từ microservices testing áp dụng cho data contracts:

# test_order_contract.py — chạy trong data team's CI
import pytest

def test_orders_schema_backward_compatible():
    """Data team verify producer chưa break contract."""
    registry = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

    # Fetch schema hiện tại từ registry
    schema = registry.get_latest_version("orders.completed-value")

    # Assert các field quan trọng với data team vẫn còn
    schema_dict = json.loads(schema.schema.schema_str)
    field_names = {f["name"] for f in schema_dict["fields"]}

    assert "order_id" in field_names, "order_id is required by revenue pipeline"
    assert "amount_vnd" in field_names, "amount_vnd is required by revenue pipeline"
    assert "created_at" in field_names, "created_at is required for time-series analysis"

def test_freshness_sla():
    """Verify producer đang meet freshness SLA."""
    latest_event_time = get_latest_event_timestamp("orders.completed")
    lag_minutes = (datetime.now() - latest_event_time).total_seconds() / 60

    assert lag_minutes < 5, f"Orders topic lag is {lag_minutes:.1f}m, exceeds 5m SLA"

6. Reverse ETL — Đưa Data Trở Lại Operational Systems

ETL vs Reverse ETL

ETL (Extract-Transform-Load):
Operational DB → [Extract] → [Transform] → [Load] → Data Warehouse
(Nguồn sự thật)                                      (Analytics)

Reverse ETL:
Data Warehouse → [Extract] → [Transform] → [Load] → Operational Systems
(Analytical insights)                                (CRM, Marketing tool, App DB)

Reverse ETL sync kết quả phân tích từ warehouse trở lại vào tools mà business team dùng hàng ngày.

Use cases thực tế

Use case 1: Personalization

-- Trong warehouse: tính user segment mỗi ngày
-- models/marts/dim_user_segments.sql
SELECT
    user_id,
    CASE
        WHEN total_spend_30d > 5000000 THEN 'high_value'
        WHEN total_orders_30d >= 3 THEN 'regular'
        WHEN last_order_days_ago > 90 THEN 'churned'
        ELSE 'occasional'
    END AS segment,
    total_spend_30d,
    total_orders_30d,
    predicted_ltv
FROM {{ ref('int_user_metrics') }}
# Reverse ETL: sync segment về app database để API đọc
def sync_user_segments():
    segments = warehouse.query("""
        SELECT user_id, segment, predicted_ltv 
        FROM dim_user_segments
        WHERE updated_at > NOW() - INTERVAL '25 hours'  -- daily job
    """)

    # Bulk upsert vào PostgreSQL
    with app_db.cursor() as cur:
        execute_values(cur, """
            INSERT INTO user_analytics (user_id, segment, predicted_ltv, synced_at)
            VALUES %s
            ON CONFLICT (user_id) DO UPDATE SET
                segment = EXCLUDED.segment,
                predicted_ltv = EXCLUDED.predicted_ltv,
                synced_at = EXCLUDED.synced_at
        """, [(r.user_id, r.segment, r.predicted_ltv, datetime.now()) for r in segments])

Use case 2: Sales team enrichment

# Sync product performance data vào Salesforce
def sync_product_metrics_to_crm():
    metrics = warehouse.query("""
        SELECT 
            product_id,
            total_orders_7d,
            revenue_7d,
            avg_rating,
            stock_alert  -- Tính trong warehouse
        FROM fct_product_performance
        WHERE updated_at > NOW() - INTERVAL '1 hour'
    """)

    salesforce = Salesforce(username=..., password=..., security_token=...)

    for batch in chunks(metrics, 200):  # Salesforce API limit
        salesforce.bulk.Product__c.upsert([{
            "Product_ID__c": r.product_id,
            "Orders_7D__c": r.total_orders_7d,
            "Revenue_7D__c": r.revenue_7d,
            "Avg_Rating__c": r.avg_rating,
        } for r in batch], "Product_ID__c")

Tools: Census, Hightouch, vs custom

Hightouch/Census (managed reverse ETL):

  • Define sync trong UI: "Mỗi giờ, sync bảng dim_user_segments vào Segment/Salesforce/HubSpot"
  • Manage incremental sync, retry, alerting tự động
  • Cost: ~$500-2000/tháng tùy volume
  • Dùng khi: team không muốn maintain custom code, nhiều destinations

Custom reverse ETL:

  • Dagster/Prefect asset: warehouse query → Python transform → API push
  • Cost: engineering time
  • Dùng khi: destinations không phổ biến, cần custom logic phức tạp, cost matters

Rule of thumb: Nếu data team đã dùng dbt + Dagster, custom reverse ETL asset là tự nhiên nhất. Nếu không có data engineer và business team cần self-serve, Hightouch.


7. Real-time OLAP — ClickHouse, Druid, Pinot

OLTP vs OLAP vs Real-time OLAP

OLTP (PostgreSQL, MySQL):
- Optimized for: transactional writes, point reads
- Latency: <10ms cho single row
- Scale: hàng triệu rows per table
- Query: "Get order #123", "Update user profile"

OLAP (BigQuery, Redshift, Snowflake):
- Optimized for: complex aggregations over large datasets
- Latency: seconds đến minutes
- Scale: hàng tỷ rows
- Query: "Total revenue by day for last year"

Real-time OLAP (ClickHouse, Druid, Pinot):
- Optimized for: fast aggregations + fresh data (seconds lag)
- Latency: milliseconds đến low seconds
- Scale: hàng tỷ rows
- Query: "Active users in last 5 minutes by country"

ClickHouse — Khi nào và tại sao

ClickHouse là columnar database tối ưu cho analytics queries. Không phải distributed-native (có thể cluster nhưng complex), nhưng single-node performance đáng kinh ngạc: một server có thể aggregate hàng tỷ rows/giây.

Use cases phù hợp:

  • Application analytics dashboard (user activity, funnel, retention)
  • Log analytics (gắn vào Kafka, query real-time)
  • Product metrics dashboards cần sub-second response
-- ClickHouse: Tạo bảng với ReplicatedMergeTree engine
CREATE TABLE user_events
(
    event_id     UUID,
    user_id      UInt64,
    event_type   LowCardinality(String),  -- Tối ưu cho low-cardinality strings
    country      LowCardinality(String),
    created_at   DateTime64(3),           -- milliseconds precision
    properties   String                   -- JSON blob
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/user_events', '{replica}')
PARTITION BY toYYYYMM(created_at)         -- Partition by month
ORDER BY (event_type, user_id, created_at) -- Sort key = primary index
;

-- Ingest từ Kafka — ClickHouse consume trực tiếp từ Kafka topic
CREATE TABLE user_events_kafka_queue
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
         kafka_topic_list = 'user-events',
         kafka_group_name = 'clickhouse-consumer',
         kafka_format = 'JSONEachRow';

-- Materialized view: auto-populate từ Kafka sang MergeTree
CREATE MATERIALIZED VIEW user_events_mv TO user_events
AS SELECT * FROM user_events_kafka_queue;
-- Query: Active users trong 5 phút qua, theo quốc gia
-- Trả về trong ~50ms với billions of rows
SELECT
    country,
    uniqExact(user_id)    AS active_users,
    countIf(event_type = 'purchase') AS purchases
FROM user_events
WHERE created_at >= now() - INTERVAL 5 MINUTE
GROUP BY country
ORDER BY active_users DESC
LIMIT 20;

ClickHouse limitations:

  • Updates/deletes rất chậm — không phải OLTP
  • Single table queries mạnh; complex joins với nhiều tables kém hơn BigQuery
  • Cluster setup phức tạp hơn Druid/Pinot (dùng ZooKeeper hoặc ClickHouse Keeper)

Apache Druid — Khi cần sub-second với high concurrency

Druid là distributed OLAP engine, thiết kế cho real-time ingestion + sub-second queries với hàng nghìn concurrent users. Dùng nhiều ở Lyft, Netflix, Airbnb cho product analytics.

Kafka → [Druid Ingestion] → [Real-time segments] + [Historical segments on S3]
                                    ↓
                            Query Router → Broker → Historical + Real-time nodes

Druid tự động tier data: real-time segments trên memory/SSD, historical segments trên S3/HDFS. Query merge cả hai trong milliseconds.

// Druid ingestion spec — consume từ Kafka
{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "user_events",
    "timestampSpec": {"column": "created_at", "format": "iso"},
    "dimensionsSpec": {
      "dimensions": ["user_id", "event_type", "country", "device"]
    },
    "metricsSpec": [
      {"type": "count", "name": "event_count"},
      {"type": "longSum", "name": "revenue_sum", "fieldName": "amount_vnd"},
      {"type": "thetaSketch", "name": "unique_users", "fieldName": "user_id"}
    ],
    "granularitySpec": {
      "segmentGranularity": "HOUR",  // Một segment per giờ
      "queryGranularity": "MINUTE"   // Aggregate xuống từng phút
    }
  },
  "ioConfig": {
    "topic": "user-events",
    "bootstrap.servers": "kafka:9092"
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 500000
  }
}

Druid vs ClickHouse:

ClickHouse Druid
Architecture Simple (single node mạnh) Complex distributed
Real-time ingestion Tốt (qua Kafka table engine) Xuất sắc (native)
Query concurrency Trung bình Cao (100s concurrent queries)
Joins Hạn chế Native lookup tables
Operational complexity Thấp Cao (nhiều node types)
Khi nào chọn Analytics tập trung một nhóm, simpler ops Product analytics platform, high QPS

Apache Pinot — LinkedIn/Uber scale

Pinot tương tự Druid về architecture nhưng tối ưu cho user-facing real-time analytics — queries triggered by end users, không phải internal analysts.

Pinot có StarTree index — pre-aggregate trên tất cả dimension combinations, trả về sub-10ms ngay cả với hàng tỷ rows. LinkedIn dùng cho "Who viewed your profile" analytics.

Khi nào backend engineer encounter Pinot:

  • Bạn đang build feature cần real-time counters (view counts, like counts, impressions) hiển thị cho người dùng cuối
  • Dashboard analytics phải response < 100ms cho hàng triệu concurrent users
  • "How many users saw this content in the last hour?" được hỏi từ application code, không từ BI tool

8. Kiến trúc tổng thể — Putting it all together

Modern data stack của một product company tầm trung

                        SOURCES
    ┌──────────────────────────────────────────────┐
    │  App Events    PostgreSQL    External APIs    │
    │  (Kafka)       (CDC/Debezium)  (Webhooks)    │
    └──────────┬──────────┬────────────┬───────────┘
               │          │            │
               ▼          ▼            ▼
        ┌──────────────────────────────────┐
        │          INGESTION LAYER         │
        │  Kafka Connect / Fivetran / Airbyte│
        │  (Schema Registry enforces contracts)│
        └─────────────────┬────────────────┘
                          │
                          ▼
        ┌──────────────────────────────────┐
        │         STORAGE LAYER            │
        │  S3/GCS + Apache Iceberg         │
        │  ├── raw/           (unmodified) │
        │  ├── staging/       (cleaned)    │
        │  └── marts/         (aggregated) │
        └──────────┬───────────────────────┘
                   │
         ┌─────────┼─────────────────────┐
         ▼         ▼                     ▼
  ┌────────────┐ ┌──────────────┐  ┌─────────────────┐
  │    dbt     │ │  ClickHouse  │  │  Query Engines  │
  │(transform) │ │(real-time    │  │  Spark / Trino  │
  │            │ │ OLAP)        │  │  DuckDB         │
  └─────┬──────┘ └──────┬───────┘  └────────┬────────┘
        │               │                   │
        ▼               ▼                   ▼
  ┌─────────────────────────────────────────────┐
  │              CONSUMPTION LAYER              │
  │  BI Tools    ML Features   Reverse ETL      │
  │  (Metabase)  (Feature Store) (→ App DB)    │
  └─────────────────────────────────────────────┘

Data flow cụ thể: Order completed đến revenue dashboard

1. User checkout → App emit Kafka event "order.completed"
   (Avro schema enforced by Schema Registry)

2. [~1 giây] Kafka Connect consumer:
   → Append event đến S3: s3://data/raw/orders/year=2026/month=04/...parquet

3. [Đồng thời] ClickHouse Kafka consumer:
   → Ingest vào ClickHouse table "order_events"
   → Real-time dashboard đọc được ngay

4. [Mỗi giờ] Dagster job trigger dbt:
   dbt run --select stg_orders fct_daily_revenue
   → S3 raw → dbt staging (clean, type cast)
   → S3 staging → dbt facts (aggregate)

5. [8am daily] Dagster job:
   → dbt model "fct_daily_report" → email report

6. [Mỗi ngày] Reverse ETL job:
   → fct_user_segments → App PostgreSQL "user_analytics" table
   → API đọc segment cho personalization

Câu hỏi backend engineer nên hỏi khi review kiến trúc data

1. Schema evolution:

"Khi tôi đổi field trong event, ai cần được notify? Bao nhiêu ngày notice?"

2. Data contract:

"Team nào là consumer của events tôi emit? Contract được version control ở đâu?"

3. Freshness requirement:

"Pipeline chạy batch hay stream? Nếu batch, data trên dashboard lag bao lâu là acceptable?"

4. Debugging:

"Khi pipeline fail lúc 2am, ai là on-call? Làm sao backfill data sau khi fix?"

5. OLAP vs warehouse:

"Feature analytics dashboard này cần sub-second latency không, hay vài giây là OK? Câu trả lời quyết định dùng ClickHouse hay BigQuery."


Tổng kết: Mental Model

Data platform modern không phải monolith — đây là composition của các specialized tools, mỗi tool giải quyết một vấn đề cụ thể:

Vấn đề                    Tool
──────────────────────    ──────────────────────────────────
Ingest data               Kafka Connect / Fivetran / Airbyte
Store raw data rẻ         S3 + Apache Iceberg / Delta Lake
Transform SQL-based       dbt
Schedule và orchestrate   Airflow / Dagster / Prefect
Enforce schema contract   Confluent Schema Registry
Real-time analytics       ClickHouse / Druid / Pinot
Batch analytics           BigQuery / Redshift / Trino on Iceberg
Sync insight → ops        Reverse ETL (Hightouch / custom)

Không có "one size fits all". Trade-off cơ bản luôn là: freshness vs cost vs query complexity.

  • Sub-second, simple queries → ClickHouse/Druid
  • Minutes lag, complex SQL → dbt + Trino on Iceberg
  • Hours lag, batch ML → Spark on Iceberg

Backend engineer giỏi không cần biết cách vận hành Druid cluster. Nhưng cần biết: events mình emit sẽ chạy qua pipeline nào, contract là gì, và khi schema thay đổi — ai cần được báo trước.