Traditional CRUD:
Same model cho read và write
→ Cùng DB, cùng schema, cùng service
┌─────────────────────────────────┐
│ Application │
│ Read: SELECT * FROM orders │
│ Write: INSERT INTO orders... │
└──────────────┬──────────────────┘
│
┌───────▼───────┐
│ Single DB │
└───────────────┘
CQRS:
Tách Command (write) và Query (read) thành models riêng biệt
Commands (write): Queries (read):
┌──────────────────┐ ┌──────────────────┐
│ Command Model │ │ Query Model │
│ - Validation │ │ - Denormalized │
│ - Business logic│ │ - Optimized │
│ - Domain events │ │ - Read-friendly │
└────────┬─────────┘ └────────▲─────────┘
│ │
┌─────▼──────┐ sync/async ┌──────┴──────┐
│ Write DB │ ─────────────► │ Read DB │
│ (normalized│ │(denormalized│
│ OLTP) │ │ views) │
└────────────┘ └─────────────┘
5.2 Tại sao cần CQRS?
Vấn đề với single model:
Read model: cần JOINs phức tạp, aggregations, denormalized views
Write model: cần normalization, constraints, business rules
→ Một model không thể tối ưu cả hai
Scale khác nhau:
Reads : Writes = 100:1 (thường) → cần scale độc lập
Different performance requirements:
Writes: cần consistency, validation
Reads: cần low latency, high throughput
5.3 CQRS Implementation Levels
Level 1 — Same DB, Separate Models (đơn giản nhất):
Command side → write to normalized tables
Query side → read từ same DB nhưng dùng views/stored procs
Pro: không cần sync, đơn giản
Con: không scale reads/writes độc lập
Level 2 — Separate Read Store (phổ biến nhất):
Command side → write to PostgreSQL (source of truth)
Event/sync → update read stores
Query side → read từ Elasticsearch (full-text), Redis (cache),
MongoDB (flexible queries), read replicas
┌─────────────┐ events ┌─────────────┐
│ Command DB │ ───────────► │ Redis │ (hot data)
│ (PostgreSQL)│ ├─────────────┤
└─────────────┘ │ Elasticsearch│ (search)
├─────────────┤
│ PostgreSQL │ (read replica)
│ read replica│
└─────────────┘
Level 3 — Event-Driven CQRS (kết hợp Event Sourcing):
Command side → append events to Event Store
Projectors → build read models từ events
Query side → read từ materialized views
(xem section 7)
5.4 Command Model Chi tiết
Command = ý định thay đổi state
- Có thể bị reject (validation fail)
- Imperative: "PlaceOrder", "CancelOrder", "TransferMoney"
- Không return data (chỉ success/failure)
Command Handler:
1. Validate command (input validation)
2. Load aggregate từ DB
3. Execute business logic trên aggregate
4. Check business rules / invariants
5. Persist changes (+ emit domain events)
6. Return result (thường chỉ OK/Error)
// Ví dụ Go
type PlaceOrderCommand struct {
UserID string
Items []OrderItem
AddressID string
}
type PlaceOrderHandler struct {
repo OrderRepository
eventBus EventBus
}
func (h *PlaceOrderHandler) Handle(cmd PlaceOrderCommand) error {
// 1. Validate
if len(cmd.Items) == 0 {
return ErrEmptyOrder
}
// 2. Load aggregate
user, err := h.repo.GetUser(cmd.UserID)
if err != nil { return err }
// 3. Business logic
order, err := user.PlaceOrder(cmd.Items, cmd.AddressID)
if err != nil { return err } // business rule violation
// 4. Persist + emit events
h.repo.Save(order)
h.eventBus.Publish(order.DomainEvents())
return nil
}
5.5 Query Model Chi tiết
Query = request data, KHÔNG thay đổi state
- Luôn succeed (không có business logic)
- Có thể đọc từ nhiều read stores
- Optimized cho specific use cases
// Denormalized read model (không normalized!)
type OrderSummaryView struct {
OrderID string
UserName string // denormalized từ users table
UserEmail string // denormalized
TotalAmount float64
Status string
ItemCount int
CreatedAt time.Time
// Tất cả trong 1 document/row → không cần JOIN!
}
Query Handler:
1. Nhận query params
2. Read từ read store (đã pre-computed)
3. Return DTO (Data Transfer Object)
→ Không có business logic
→ Không có side effects
6. Event Sourcing
6.1 Ý tưởng cốt lõi
Traditional State Storage:
Lưu TRẠNG THÁI HIỆN TẠI của entity
UPDATE orders SET status='shipped' WHERE id=123
→ Không biết tại sao status thay đổi
→ Không biết status trước đó là gì
→ Không thể "time travel"
Event Sourcing:
Lưu CHUỖI SỰ KIỆN xảy ra với entity
→ State = replay tất cả events từ đầu
OrderCreated { id:123, items:[...], total:99 } at t=1
PaymentReceived { id:123, amount:99 } at t=2
OrderShipped { id:123, trackingNo:"ABC123" } at t=3
DeliveryConfirmed { id:123, deliveredAt:"2024-..." } at t=4
Current state = apply tất cả events theo thứ tự
6.2 Event Store
Event Store = append-only log of events
Schema:
┌────────────────────────────────────────────────────────────┐
│ events │
│ id BIGINT PRIMARY KEY, auto-increment │
│ stream_id VARCHAR (aggregate ID, e.g., order-123) │
│ type VARCHAR (event type, e.g., OrderCreated) │
│ version INT (sequence per stream, for OCC) │
│ data JSONB (event payload) │
│ metadata JSONB (correlation_id, causation_id) │
│ occurred_at TIMESTAMPTZ (when event happened) │
└────────────────────────────────────────────────────────────┘
Append-only = NEVER update or delete events (immutable log)
Optimistic concurrency:
INSERT event WHERE version = expected_version
→ Nếu version đã tồn tại → conflict → retry
6.3 Aggregate Reconstruction
Load order-123:
SELECT * FROM events
WHERE stream_id = 'order-123'
ORDER BY version ASC
Apply từng event:
state = {}
state = apply(OrderCreated, state)
= { id:123, status:'pending', items:[...] }
state = apply(PaymentReceived, state)
= { id:123, status:'paid', ... }
state = apply(OrderShipped, state)
= { id:123, status:'shipped', trackingNo:'ABC' }
→ Current state của order-123
Vấn đề performance khi events nhiều:
Order có 10,000 events → replay tốn kém
Snapshots:
Định kỳ save snapshot của current state
Khi load: đọc snapshot gần nhất + events sau snapshot
events: [e1][e2]...[e500] [snapshot@500] [e501][e502]...[e523]
Load = snapshot@500 + apply e501..e523 (chỉ 23 events!)
6.4 Lợi ích của Event Sourcing
1. Complete Audit Log:
Biết CHÍNH XÁC chuyện gì xảy ra và khi nào
→ Compliance, debugging, forensics
2. Temporal Queries (Time Travel):
"Order-123 trông như thế nào vào ngày 1/1/2024?"
→ Replay events đến timestamp đó
3. Event Replay & Projections:
Rebuild read models bất cứ lúc nào
Thêm business insight mới → replay toàn bộ events → new projection
4. Debugging:
Reproduce bug: replay events đến thời điểm bug
→ Exact same state như production
5. Decoupling:
Events = integration points cho other services
New service subscribe to events → không cần migration
Nhược điểm:
Learning curve cao
Eventual consistency cho read models
Schema evolution phức tạp (event schema thay đổi)
Querying current state phức tạp hơn (cần read models)
Snapshot management
6.5 Event Schema Evolution
Vấn đề: Sau 2 năm, OrderCreated event thêm field mới
Strategy 1 — Weak Schema:
{ "version": 1, "items": [...] } (old)
{ "version": 2, "items": [...], "discount": 0.1 } (new)
→ Code xử lý cả 2 versions khi replay
Strategy 2 — Upcasting:
Khi load event cũ → transform lên version mới trước khi apply
V1 → V2 upcaster: thêm "discount": 0.0 vào V1 events
→ Code chỉ cần xử lý latest version
Strategy 3 — Copy-and-Transform:
Migrate event store: rewrite old events sang new schema
Nguy hiểm (mất immutability) — thường không khuyến khích
Best practice: thiết kế events để backward compatible
Thêm optional fields thay vì xóa/đổi tên
Semantic versioning cho event schemas
Projection = function(events) → read model
Ví dụ: OrderListProjection
ON OrderCreated:
INSERT INTO order_list_view (id, user_name, status, total, created_at)
VALUES (event.id, lookup_user_name(event.user_id), 'pending', event.total, event.time)
ON OrderShipped:
UPDATE order_list_view
SET status = 'shipped', tracking_no = event.tracking_no
WHERE id = event.order_id
ON OrderCancelled:
UPDATE order_list_view SET status = 'cancelled' WHERE id = event.order_id
Rebuild projection (khi cần):
DROP TABLE order_list_view
CREATE TABLE order_list_view (...)
Replay ALL events → project lại từ đầu
→ Read model luôn có thể được rebuild!
Checkpoint:
Projector lưu position (event ID) đã xử lý
→ Restart an toàn: tiếp tục từ checkpoint