⚙️ Software✍️ Khoa📅 20/04/2026☕ 8 phút đọc

System Design: CQRS và Event Sourcing

5. CQRS — Command Query Responsibility Segregation

5.1 Ý tưởng cơ bản

Traditional CRUD:
  Same model cho read và write
  → Cùng DB, cùng schema, cùng service

  ┌─────────────────────────────────┐
  │  Application                    │
  │  Read:  SELECT * FROM orders    │
  │  Write: INSERT INTO orders...   │
  └──────────────┬──────────────────┘
                 │
         ┌───────▼───────┐
         │  Single DB    │
         └───────────────┘

CQRS:
  Tách Command (write) và Query (read) thành models riêng biệt

  Commands (write):               Queries (read):
  ┌──────────────────┐            ┌──────────────────┐
  │  Command Model   │            │   Query Model    │
  │  - Validation    │            │  - Denormalized  │
  │  - Business logic│            │  - Optimized     │
  │  - Domain events │            │  - Read-friendly │
  └────────┬─────────┘            └────────▲─────────┘
           │                               │
     ┌─────▼──────┐   sync/async   ┌──────┴──────┐
     │  Write DB  │ ─────────────► │   Read DB   │
     │ (normalized│                │(denormalized│
     │  OLTP)     │                │  views)     │
     └────────────┘                └─────────────┘

5.2 Tại sao cần CQRS?

Vấn đề với single model:
  Read model: cần JOINs phức tạp, aggregations, denormalized views
  Write model: cần normalization, constraints, business rules
  → Một model không thể tối ưu cả hai

  Scale khác nhau:
  Reads : Writes = 100:1 (thường) → cần scale độc lập

  Different performance requirements:
  Writes: cần consistency, validation
  Reads: cần low latency, high throughput

5.3 CQRS Implementation Levels

Level 1 — Same DB, Separate Models (đơn giản nhất):
  Command side → write to normalized tables
  Query side → read từ same DB nhưng dùng views/stored procs
  Pro: không cần sync, đơn giản
  Con: không scale reads/writes độc lập

Level 2 — Separate Read Store (phổ biến nhất):
  Command side → write to PostgreSQL (source of truth)
  Event/sync → update read stores
  Query side → read từ Elasticsearch (full-text), Redis (cache),
               MongoDB (flexible queries), read replicas

  ┌─────────────┐    events    ┌─────────────┐
  │  Command DB │ ───────────► │   Redis     │ (hot data)
  │ (PostgreSQL)│              ├─────────────┤
  └─────────────┘              │ Elasticsearch│ (search)
                               ├─────────────┤
                               │  PostgreSQL  │ (read replica)
                               │  read replica│
                               └─────────────┘

Level 3 — Event-Driven CQRS (kết hợp Event Sourcing):
  Command side → append events to Event Store
  Projectors → build read models từ events
  Query side → read từ materialized views
  (xem section 7)

5.4 Command Model Chi tiết

Command = ý định thay đổi state
  - Có thể bị reject (validation fail)
  - Imperative: "PlaceOrder", "CancelOrder", "TransferMoney"
  - Không return data (chỉ success/failure)

Command Handler:
  1. Validate command (input validation)
  2. Load aggregate từ DB
  3. Execute business logic trên aggregate
  4. Check business rules / invariants
  5. Persist changes (+ emit domain events)
  6. Return result (thường chỉ OK/Error)

// Ví dụ Go
type PlaceOrderCommand struct {
    UserID    string
    Items     []OrderItem
    AddressID string
}

type PlaceOrderHandler struct {
    repo      OrderRepository
    eventBus  EventBus
}

func (h *PlaceOrderHandler) Handle(cmd PlaceOrderCommand) error {
    // 1. Validate
    if len(cmd.Items) == 0 {
        return ErrEmptyOrder
    }

    // 2. Load aggregate
    user, err := h.repo.GetUser(cmd.UserID)
    if err != nil { return err }

    // 3. Business logic
    order, err := user.PlaceOrder(cmd.Items, cmd.AddressID)
    if err != nil { return err } // business rule violation

    // 4. Persist + emit events
    h.repo.Save(order)
    h.eventBus.Publish(order.DomainEvents())
    return nil
}

5.5 Query Model Chi tiết

Query = request data, KHÔNG thay đổi state
  - Luôn succeed (không có business logic)
  - Có thể đọc từ nhiều read stores
  - Optimized cho specific use cases

// Denormalized read model (không normalized!)
type OrderSummaryView struct {
    OrderID     string
    UserName    string    // denormalized từ users table
    UserEmail   string    // denormalized
    TotalAmount float64
    Status      string
    ItemCount   int
    CreatedAt   time.Time
    // Tất cả trong 1 document/row → không cần JOIN!
}

Query Handler:
  1. Nhận query params
  2. Read từ read store (đã pre-computed)
  3. Return DTO (Data Transfer Object)
  → Không có business logic
  → Không có side effects

6. Event Sourcing

6.1 Ý tưởng cốt lõi

Traditional State Storage:
  Lưu TRẠNG THÁI HIỆN TẠI của entity
  UPDATE orders SET status='shipped' WHERE id=123
  → Không biết tại sao status thay đổi
  → Không biết status trước đó là gì
  → Không thể "time travel"

Event Sourcing:
  Lưu CHUỖI SỰ KIỆN xảy ra với entity
  → State = replay tất cả events từ đầu

  OrderCreated      { id:123, items:[...], total:99  } at t=1
  PaymentReceived   { id:123, amount:99              } at t=2
  OrderShipped      { id:123, trackingNo:"ABC123"    } at t=3
  DeliveryConfirmed { id:123, deliveredAt:"2024-..."  } at t=4

  Current state = apply tất cả events theo thứ tự

6.2 Event Store

Event Store = append-only log of events

Schema:
  ┌────────────────────────────────────────────────────────────┐
  │  events                                                    │
  │  id          BIGINT       PRIMARY KEY, auto-increment      │
  │  stream_id   VARCHAR      (aggregate ID, e.g., order-123)  │
  │  type        VARCHAR      (event type, e.g., OrderCreated) │
  │  version     INT          (sequence per stream, for OCC)   │
  │  data        JSONB        (event payload)                  │
  │  metadata    JSONB        (correlation_id, causation_id)   │
  │  occurred_at TIMESTAMPTZ  (when event happened)            │
  └────────────────────────────────────────────────────────────┘

Append-only = NEVER update or delete events (immutable log)
Optimistic concurrency:
  INSERT event WHERE version = expected_version
  → Nếu version đã tồn tại → conflict → retry

6.3 Aggregate Reconstruction

Load order-123:
  SELECT * FROM events
  WHERE stream_id = 'order-123'
  ORDER BY version ASC

  Apply từng event:
  state = {}
  state = apply(OrderCreated, state)
         = { id:123, status:'pending', items:[...] }
  state = apply(PaymentReceived, state)
         = { id:123, status:'paid', ... }
  state = apply(OrderShipped, state)
         = { id:123, status:'shipped', trackingNo:'ABC' }

  → Current state của order-123

Vấn đề performance khi events nhiều:
  Order có 10,000 events → replay tốn kém

Snapshots:
  Định kỳ save snapshot của current state
  Khi load: đọc snapshot gần nhất + events sau snapshot

  events: [e1][e2]...[e500] [snapshot@500] [e501][e502]...[e523]
  Load = snapshot@500 + apply e501..e523  (chỉ 23 events!)

6.4 Lợi ích của Event Sourcing

1. Complete Audit Log:
   Biết CHÍNH XÁC chuyện gì xảy ra và khi nào
   → Compliance, debugging, forensics

2. Temporal Queries (Time Travel):
   "Order-123 trông như thế nào vào ngày 1/1/2024?"
   → Replay events đến timestamp đó

3. Event Replay & Projections:
   Rebuild read models bất cứ lúc nào
   Thêm business insight mới → replay toàn bộ events → new projection

4. Debugging:
   Reproduce bug: replay events đến thời điểm bug
   → Exact same state như production

5. Decoupling:
   Events = integration points cho other services
   New service subscribe to events → không cần migration

Nhược điểm:
  Learning curve cao
  Eventual consistency cho read models
  Schema evolution phức tạp (event schema thay đổi)
  Querying current state phức tạp hơn (cần read models)
  Snapshot management

6.5 Event Schema Evolution

Vấn đề: Sau 2 năm, OrderCreated event thêm field mới

Strategy 1 — Weak Schema:
  { "version": 1, "items": [...] }          (old)
  { "version": 2, "items": [...], "discount": 0.1 }  (new)
  → Code xử lý cả 2 versions khi replay

Strategy 2 — Upcasting:
  Khi load event cũ → transform lên version mới trước khi apply
  V1 → V2 upcaster: thêm "discount": 0.0 vào V1 events
  → Code chỉ cần xử lý latest version

Strategy 3 — Copy-and-Transform:
  Migrate event store: rewrite old events sang new schema
  Nguy hiểm (mất immutability) — thường không khuyến khích

Best practice: thiết kế events để backward compatible
  Thêm optional fields thay vì xóa/đổi tên
  Semantic versioning cho event schemas

7. CQRS + Event Sourcing kết hợp

7.1 Full Architecture

                        COMMAND SIDE
  ┌─────────────┐    ┌──────────────┐    ┌─────────────────┐
  │   Client    │───►│   Command    │───►│   Event Store   │
  │             │    │   Handler    │    │ (append-only)   │
  └─────────────┘    └──────────────┘    └────────┬────────┘
                                                   │
                                            Domain Events
                                                   │
                        QUERY SIDE                 │
  ┌─────────────┐    ┌──────────────┐    ┌─────────▼────────┐
  │   Client    │───►│   Query      │◄───│   Projectors     │
  │             │    │   Handler    │    │ (event handlers) │
  └─────────────┘    └──────┬───────┘    └──────────────────┘
                            │                      │
                   ┌────────▼────────┐  ┌──────────▼──────────┐
                   │  Read Store 1   │  │   Read Store 2       │
                   │  (PostgreSQL    │  │   (Elasticsearch)    │
                   │   materialized) │  │   (full-text search) │
                   └─────────────────┘  └─────────────────────┘

Flow:
  1. Client gửi Command → Command Handler
  2. Handler load Aggregate từ Event Store (replay events)
  3. Handler execute business logic → emit Domain Events
  4. Events được appended vào Event Store
  5. Projectors consume events → update Read Stores
  6. Client query → Query Handler đọc từ Read Store

7.2 Projections

Projection = function(events) → read model

Ví dụ: OrderListProjection
  ON OrderCreated:
    INSERT INTO order_list_view (id, user_name, status, total, created_at)
    VALUES (event.id, lookup_user_name(event.user_id), 'pending', event.total, event.time)

  ON OrderShipped:
    UPDATE order_list_view
    SET status = 'shipped', tracking_no = event.tracking_no
    WHERE id = event.order_id

  ON OrderCancelled:
    UPDATE order_list_view SET status = 'cancelled' WHERE id = event.order_id

Rebuild projection (khi cần):
  DROP TABLE order_list_view
  CREATE TABLE order_list_view (...)
  Replay ALL events → project lại từ đầu
  → Read model luôn có thể được rebuild!

Checkpoint:
  Projector lưu position (event ID) đã xử lý
  → Restart an toàn: tiếp tục từ checkpoint