IoT Platform - System Design cho Senior Engineers
Giới thiệu
Nếu bạn đang thiết kế một IoT platform, bạn đang đối mặt với một trong những bài toán distributed systems khó nhất: quản lý hàng triệu (hoặc tỷ) thiết bị với hardware hạn chế, kết nối không ổn định, và yêu cầu xử lý real-time với độ trễ thấp.
Thách thức đặc trưng của IoT
Traditional Web API vs IoT Platform
───────────────── ─────────────
• 1M users/day • 100M devices always-on
• Rich clients (browser/mobile) • Constrained devices (256KB RAM)
• Stable connectivity • Intermittent network (2G/3G)
• Request-response model • Push + bidirectional
• Stateless architecture • Stateful (device shadows)
• Human-generated data • High-frequency telemetry
Key challenges:
- Scale: Billions of devices, millions of concurrent connections
- Hardware constraints: 256KB RAM, 32KB flash, 8-bit CPU
- Connectivity: Intermittent, high latency (satellite), low bandwidth
- Power: Battery-powered devices (years of lifetime)
- Security: Physical access to devices, firmware vulnerabilities
- Heterogeneity: Protocols (MQTT, CoAP, HTTP), data formats, vendors
IoT Platform Architecture (High-Level)
┌─────────────────────────────────────────────────────────────────┐
│ IoT Devices Layer │
│ [Sensors] [Actuators] [Edge Gateways] [Industrial Equipment] │
└────────────┬────────────────────────────┬───────────────────────┘
│ MQTT/CoAP/HTTP │ TLS/DTLS
▼ ▼
┌────────────────────────┐ ┌──────────────────────┐
│ Protocol Gateway │ │ Device Gateway │
│ (MQTT Broker Cluster) │ │ (Load Balancer) │
└────────┬───────────────┘ └──────────┬───────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────┐
│ Message Router / Event Hub │
│ (Kafka / Kinesis / Azure Event Hub) │
└─────┬──────────────┬──────────────┬─────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌─────────────┐ ┌─────────────┐
│ Telemetry│ │ Command │ │ Device │
│ Ingestion│ │ Service │ │ Registry │
└────┬─────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
▼ ▼ ▼
┌────────────┐ ┌──────────┐ ┌─────────────┐
│ Time-Series│ │ Device │ │ Postgres │
│ DB (Influx)│ │ Shadows │ │ (metadata) │
└────────────┘ └──────────┘ └─────────────┘
Device Management
Device Provisioning at Scale
Challenge: Làm sao provision 1 triệu devices mà không phải manually configure từng cái?
Strategies:
- Zero-Touch Provisioning (ZTP)
Device Boot → DHCP → Download config from server → Authenticate → Activate
- Certificate-based Auto-enrollment (như AWS IoT Just-in-Time Registration)
# Device-side: Generate CSR and request certificate
import ssl
import paho.mqtt.client as mqtt
def provision_device():
# Step 1: Generate key pair (done once in factory)
device_id = get_hardware_id() # MAC address, serial number
# Step 2: Connect with claim certificate (embedded in firmware)
client = mqtt.Client(client_id=device_id)
client.tls_set(
ca_certs="root-ca.pem",
certfile="claim-cert.pem", # Factory-issued
keyfile="claim-key.pem"
)
# Step 3: Publish provisioning request
client.connect("provisioning.iot.example.com", 8883)
client.publish(
f"$aws/certificates/create-from-csr/json",
payload=json.dumps({
"csr": generate_csr(device_id),
"template": "production-template"
})
)
# Step 4: Receive unique certificate
# Server validates claim cert → issues device cert → stores in registry
# Platform-side: Auto-registration Lambda
def on_certificate_created(event):
cert_id = event['certificateId']
device_id = extract_cn(event['csr'])
# Validate against whitelist (factory manifest)
if not is_authorized(device_id):
revoke_certificate(cert_id)
return
# Create device in registry
create_device(device_id, cert_id)
# Attach policy (publish/subscribe permissions)
attach_policy(cert_id, "device-policy")
# Initialize device shadow
create_shadow(device_id, {"state": {"desired": {}}})
Trade-offs:
- Claim cert: Simpler, nhưng nếu leak thì attacker có thể provision rogue devices
- Pre-registration: Secure hơn (whitelist serial numbers), nhưng cần manual upload manifest
- Public Key Infrastructure (PKI): Scalable nhất, nhưng complex (CA hierarchy, CRL/OCSP)
Firmware Over-the-Air (OTA) Updates
Scenario: Bạn cần patch security vulnerability trên 10 triệu devices đang chạy ở production.
Architecture:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Build CI/CD │──────▶│ FW Registry │──────▶│ CDN/S3 │
│ (signed fw) │ │ (metadata DB)│ │ (binary blob)│
└──────────────┘ └──────┬───────┘ └──────────────┘
│
▼
┌──────────────┐
│ OTA Scheduler│
│ (canary/ │
│ rollout) │
└──────┬───────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
[Device A] [Device B] [Device C]
(health check) (download) (apply + verify)
Production-grade OTA Service (Go):
package ota
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
)
type FirmwareMetadata struct {
Version string
MinVersion string // Devices below this can't upgrade
URL string // CDN URL
Size int64
SHA256 string
Signature string // RSA signature from build server
RolloutPhase string // "canary", "staged", "general"
}
type OTAService struct {
registry DeviceRegistry
storage BlobStorage
metrics MetricsCollector
}
// Device polls for updates (pull model)
func (s *OTAService) CheckUpdate(ctx context.Context, deviceID string) (*FirmwareMetadata, error) {
device, err := s.registry.GetDevice(ctx, deviceID)
if err != nil {
return nil, err
}
// Get latest firmware for device type
latest, err := s.registry.GetLatestFirmware(device.Type, device.HardwareVersion)
if err != nil {
return nil, err
}
// Skip if device already up-to-date
if device.FirmwareVersion >= latest.Version {
return nil, nil
}
// Canary strategy: only 1% of devices get latest during canary
if latest.RolloutPhase == "canary" && !s.isInCanaryGroup(deviceID, 0.01) {
return nil, nil // Skip this device for now
}
// Staged rollout: gradual percentage increase
if latest.RolloutPhase == "staged" {
percentage := s.getStagedPercentage(latest.CreatedAt)
if !s.isInCanaryGroup(deviceID, percentage) {
return nil, nil
}
}
return latest, nil
}
// Report update status (for monitoring)
func (s *OTAService) ReportUpdateStatus(ctx context.Context, deviceID, version, status string) error {
s.metrics.Increment("ota.status", map[string]string{
"version": version,
"status": status, // downloading, verifying, applying, success, failed
})
// If failure rate > 5%, auto-rollback
failureRate := s.metrics.GetRate("ota.status", map[string]string{
"version": version,
"status": "failed",
})
if failureRate > 0.05 {
s.registry.PauseFirmwareRollout(version)
s.alertOncall("High OTA failure rate for version %s", version)
}
return s.registry.UpdateDeviceFirmwareStatus(ctx, deviceID, version, status)
}
// Generate presigned URL for download (avoid hotlinking)
func (s *OTAService) GetDownloadURL(ctx context.Context, deviceID, version string) (string, error) {
// Verify device is authorized to download this version
fw, err := s.CheckUpdate(ctx, deviceID)
if err != nil || fw == nil || fw.Version != version {
return "", fmt.Errorf("unauthorized")
}
// Generate presigned URL (valid for 1 hour)
return s.storage.PresignedURL(fw.URL, 3600)
}
Device-side implementation (C for constrained devices):
#include <stdio.h>
#include <mbedtls/sha256.h>
#include <esp_https_ota.h>
// Atomic update with rollback
typedef enum {
PARTITION_A,
PARTITION_B
} partition_t;
partition_t current_partition;
partition_t next_partition;
int perform_ota_update(const char* url, const char* expected_sha256) {
esp_http_client_config_t config = {
.url = url,
.cert_pem = server_cert_pem_start,
.timeout_ms = 10000,
};
esp_https_ota_config_t ota_config = {
.http_config = &config,
};
esp_https_ota_handle_t ota_handle = NULL;
// Step 1: Begin OTA (write to inactive partition)
esp_err_t err = esp_https_ota_begin(&ota_config, &ota_handle);
if (err != ESP_OK) {
report_status("failed", "ota_begin_failed");
return -1;
}
// Step 2: Download and verify in chunks
report_status("downloading", NULL);
while (1) {
err = esp_https_ota_perform(ota_handle);
if (err != ESP_ERR_HTTPS_OTA_IN_PROGRESS) {
break;
}
// Print progress (optional for debugging via serial)
}
if (err != ESP_OK) {
esp_https_ota_abort(ota_handle);
report_status("failed", "download_failed");
return -1;
}
// Step 3: Verify signature and hash
esp_app_desc_t new_app_info;
esp_https_ota_get_img_desc(ota_handle, &new_app_info);
// Compare SHA256
if (verify_sha256(ota_handle, expected_sha256) != 0) {
esp_https_ota_abort(ota_handle);
report_status("failed", "checksum_mismatch");
return -1;
}
// Step 4: Finalize (mark partition as bootable)
err = esp_https_ota_finish(ota_handle);
if (err != ESP_OK) {
report_status("failed", "ota_finish_failed");
return -1;
}
report_status("success", NULL);
// Step 5: Reboot to new partition
// Bootloader will try new partition, fallback to old if boot fails
esp_restart();
return 0;
}
// Bootloader marks partition as valid after successful boot
void app_main() {
// Verify this partition boots correctly
const esp_partition_t *running = esp_ota_get_running_partition();
esp_ota_img_states_t ota_state;
esp_ota_get_state_partition(running, &ota_state);
if (ota_state == ESP_OTA_IMG_PENDING_VERIFY) {
// First boot after OTA, mark as valid
esp_ota_mark_app_valid_cancel_rollback();
}
// Continue app initialization...
}
Key decisions:
- Pull vs Push: Pull (device polls) scales better than push (server tracks all devices)
- Delta updates: Reduce bandwidth (binary diff), but complex implementation
- Dual-bank: Atomic updates with rollback, requires 2x flash size
- Canary rollout: Critical for catching bugs early (1% → 10% → 100%)
MQTT Protocol Deep Dive
MQTT là giao thức phổ biến nhất cho IoT vì lightweight (2-byte header tối thiểu) và publish/subscribe model (decouple producers/consumers).
MQTT Architecture
┌─────────────────────┐
│ MQTT Broker │
│ (VerneMQ/EMQ X) │
└──────────┬──────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
[Publisher A] [Publisher B] [Subscriber C]
topic: sensors/ topic: sensors/ subscribe:
temp/room1 humidity/room1 sensors/+/room1
│ │ │
└────────────────────┴────────────────────┘
Shared bus
QoS Levels
QoS 0 (At most once) QoS 1 (At least once) QoS 2 (Exactly once)
──────────────────── ───────────────────── ────────────────────
Publisher ──PUBLISH──▶ Broker Publisher ──PUBLISH──▶ Broker Publisher ──PUBLISH──▶ Broker
(no ack) ◀──PUBACK─── (packet ID) ◀──PUBREC───
──PUBREL──▶
Fire and forget May duplicate ◀──PUBCOMP──
No duplicates
When to use:
- QoS 0: Telemetry (temperature every 10s) - một sample bị mất không critical
- QoS 1: Commands (turn on light) - duplicate OK (idempotent)
- QoS 2: Billing events (charge customer) - no duplicates allowed
Performance impact:
import paho.mqtt.client as mqtt
import time
def benchmark_qos():
client = mqtt.Client()
client.connect("localhost", 1883)
payload = b"x" * 100 # 100 bytes
count = 10000
# QoS 0
start = time.time()
for i in range(count):
client.publish("benchmark/qos0", payload, qos=0)
qos0_time = time.time() - start
print(f"QoS 0: {count/qos0_time:.0f} msg/sec")
# QoS 1
start = time.time()
for i in range(count):
info = client.publish("benchmark/qos1", payload, qos=1)
info.wait_for_publish() # Wait for PUBACK
qos1_time = time.time() - start
print(f"QoS 1: {count/qos1_time:.0f} msg/sec")
# QoS 2
start = time.time()
for i in range(count):
info = client.publish("benchmark/qos2", payload, qos=2)
info.wait_for_publish() # Wait for PUBCOMP
qos2_time = time.time() - start
print(f"QoS 2: {count/qos2_time:.0f} msg/sec")
# Results (typical):
# QoS 0: ~100,000 msg/sec
# QoS 1: ~30,000 msg/sec
# QoS 2: ~10,000 msg/sec
Retained Messages & Last Will
Retained messages: Broker stores last value, new subscribers get it immediately.
# Publisher: Report device status
client.publish("devices/sensor123/status",
payload='{"online": true, "battery": 85}',
retain=True, # Store this value
qos=1)
# New subscriber connects 10 minutes later
client.subscribe("devices/+/status")
# Immediately receives retained status for all devices
Last Will Testament (LWT): Auto-publish message khi client disconnect bất ngờ.
client = mqtt.Client()
# Set LWT before connect
client.will_set(
topic="devices/sensor123/status",
payload='{"online": false}',
qos=1,
retain=True
)
client.connect("broker.example.com")
# If network drops or device crashes, broker auto-publishes LWT
Use case: Device heartbeat monitoring
type DeviceMonitor struct {
redis *redis.Client
}
// MQTT subscriber for LWT messages
func (m *DeviceMonitor) OnMessage(topic string, payload []byte) {
deviceID := extractDeviceID(topic) // devices/sensor123/status
var status struct {
Online bool `json:"online"`
Battery int `json:"battery"`
}
json.Unmarshal(payload, &status)
if !status.Online {
// Device went offline
m.redis.Set(ctx, fmt.Sprintf("device:%s:online", deviceID), "0", 0)
// Trigger alert if offline > 5 minutes
m.scheduleOfflineAlert(deviceID, 5*time.Minute)
} else {
// Device back online
m.redis.Set(ctx, fmt.Sprintf("device:%s:online", deviceID), "1", 0)
m.redis.Set(ctx, fmt.Sprintf("device:%s:last_seen", deviceID), time.Now().Unix(), 0)
}
}
MQTT Broker Clustering
Challenge: Scale MQTT broker to handle 1M concurrent connections.
VerneMQ clustering:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ VerneMQ 1 │────▶│ VerneMQ 2 │────▶│ VerneMQ 3 │
│ (100k conn) │ │ (100k conn) │ │ (100k conn) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┴───────────────────┘
Plumtree gossip (CRDT)
Config (vernemq.conf):
# Clustering
nodename = VerneMQ1@192.168.1.101
distributed_cookie = secret_cluster_cookie
# Join cluster
cluster.join = VerneMQ2@192.168.1.102
# Performance tuning
max_connections = 100000
max_inflight_messages = 100
max_message_size = 65536
# Persistent session in database (distributed)
metadata_plugin = vmq_swc
vmq_swc.db_backend = leveldb
Subscription routing: Khi device subscribe ở node A, nhưng message publish ở node B:
Device A ──subscribe("sensors/#")──▶ Node A
Device B ──publish("sensors/temp")─▶ Node B
│
Node A ◀────gossip──────┘
│
└────deliver──▶ Device A
Trade-off:
- Shared subscriptions (
$share/group/topic): Load balancing across consumers - Routing overhead: Inter-node traffic (minimize with
local_onlytopics)
Telemetry Ingestion
High-Throughput Message Pipeline
Scenario: 100,000 devices x 1 message/sec = 100k msg/sec sustained load.
Architecture:
MQTT Broker (cluster)
│
▼
┌─────────────────┐
│ Message Router │ (Kafka topic: iot.telemetry)
│ Kafka/Kinesis │ - Partitions: 50
│ │ - Replication: 3
└────────┬────────┘ - Retention: 7 days
│
├──────────────────┬──────────────────┬─────────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Stream │ │ Time- │ │ Analytics│ │ Archival │
│ Process│ │ Series │ │ Engine │ │ (S3) │
│ (Flink)│ │ Ingest │ │ (Druid) │ │ │
└────────┘ └─────────┘ └──────────┘ └──────────┘
MQTT → Kafka bridge:
package bridge
import (
"encoding/json"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/segmentio/kafka-go"
)
type MQTTKafkaBridge struct {
mqttClient mqtt.Client
kafkaWriter *kafka.Writer
}
func NewBridge(mqttBroker, kafkaBroker string) *MQTTKafkaBridge {
// Kafka writer with batching
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: "iot.telemetry",
Balancer: &kafka.Hash{}, // Partition by device ID
BatchSize: 100, // Batch 100 messages
BatchTimeout: 10 * time.Millisecond,
Compression: kafka.Snappy,
Async: true, // Don't block MQTT
}
// MQTT client
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttBroker)
opts.SetClientID("kafka-bridge")
opts.SetCleanSession(false)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(60 * time.Second)
mqttClient := mqtt.NewClient(opts)
bridge := &MQTTKafkaBridge{
mqttClient: mqttClient,
kafkaWriter: kafkaWriter,
}
// Subscribe to all telemetry
mqttClient.Subscribe("telemetry/#", 0, bridge.onMessage)
return bridge
}
func (b *MQTTKafkaBridge) onMessage(client mqtt.Client, msg mqtt.Message) {
// Extract device ID from topic (telemetry/{device_id})
parts := strings.Split(msg.Topic(), "/")
if len(parts) != 2 {
return
}
deviceID := parts[1]
// Enrich with metadata
envelope := TelemetryEnvelope{
DeviceID: deviceID,
Timestamp: time.Now().UnixNano(),
Payload: msg.Payload(),
Topic: msg.Topic(),
}
bytes, _ := json.Marshal(envelope)
// Write to Kafka (async, batched)
b.kafkaWriter.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(deviceID), // Partition by device ID
Value: bytes,
},
)
}
Backpressure Handling
Problem: Kafka cluster hiccup → messages buffer → OOM.
Solution: Circuit breaker + adaptive rate limiting.
type AdaptiveRateLimiter struct {
successCount int64
errorCount int64
rate int64 // Current allowed rate (msg/sec)
maxRate int64
lastAdjust time.Time
}
func (r *AdaptiveRateLimiter) Allow() bool {
currentRate := atomic.LoadInt64(&r.rate)
// Token bucket algorithm
if !r.bucket.Allow() {
return false
}
// Adjust rate every 10 seconds
if time.Since(r.lastAdjust) > 10*time.Second {
r.adjustRate()
}
return true
}
func (r *AdaptiveRateLimiter) adjustRate() {
success := atomic.LoadInt64(&r.successCount)
errors := atomic.LoadInt64(&r.errorCount)
total := success + errors
if total == 0 {
return
}
errorRate := float64(errors) / float64(total)
currentRate := atomic.LoadInt64(&r.rate)
var newRate int64
if errorRate > 0.05 {
// Error rate > 5%, reduce by 20%
newRate = int64(float64(currentRate) * 0.8)
} else if errorRate < 0.01 {
// Error rate < 1%, increase by 10%
newRate = int64(float64(currentRate) * 1.1)
if newRate > r.maxRate {
newRate = r.maxRate
}
} else {
newRate = currentRate
}
atomic.StoreInt64(&r.rate, newRate)
atomic.StoreInt64(&r.successCount, 0)
atomic.StoreInt64(&r.errorCount, 0)
r.lastAdjust = time.Now()
log.Printf("Adjusted rate: %d → %d msg/sec (error rate: %.2f%%)",
currentRate, newRate, errorRate*100)
}
// Integration with bridge
func (b *MQTTKafkaBridge) onMessage(client mqtt.Client, msg mqtt.Message) {
if !b.rateLimiter.Allow() {
// Drop message (or buffer to disk)
metrics.Increment("telemetry.dropped")
return
}
err := b.kafkaWriter.WriteMessages(...)
if err != nil {
b.rateLimiter.RecordError()
} else {
b.rateLimiter.RecordSuccess()
}
}
Protocol Comparison
| Feature | MQTT | HTTP | CoAP | AMQP |
|---|---|---|---|---|
| Transport | TCP | TCP | UDP | TCP |
| Overhead | 2 bytes | ~200 bytes | 4 bytes | ~8 bytes |
| QoS | 0/1/2 | None (app-level) | 0/1 (CON) | 0/1/2/3 |
| Pub/Sub | Native | Webhooks | Observe | Native |
| NAT/Firewall | Good (MQTT-SN for UDP) | Good | Poor (UDP) | Good |
| Battery | Excellent | Poor (HTTP/1.1) | Excellent | Good |
| Use Case | General IoT | Web-friendly | Constrained devices | Enterprise |
Code example - CoAP client:
from coapthon.client.helperclient import HelperClient
# CoAP uses UDP, very lightweight
client = HelperClient(server=('coap.example.com', 5683))
# Publish telemetry (CON = confirmable, like QoS 1)
response = client.post('sensors/temp',
payload='{"value": 23.5}',
timeout=5)
print(response.pretty_print())
# Observe (like MQTT subscribe)
client.observe('actuators/fan', callback=lambda r: print(r.payload))
Time-Series Data Storage
Storage Engine Comparison
InfluxDB (popular choice):
┌──────────────────────────────────────┐
│ InfluxDB Architecture │
├──────────────────────────────────────┤
│ WAL (Write-Ahead Log) │ ← Durability
├──────────────────────────────────────┤
│ Cache (in-memory writes) │ ← Fast writes
├──────────────────────────────────────┤
│ TSM Files (Time-Structured Merge) │ ← Columnar storage
│ - data/ │
│ - 00001.tsm (1h block) │
│ - 00002.tsm (1h block) │
│ - index/ │
│ - tag index (inverted) │
└──────────────────────────────────────┘
Schema design:
-- Measurement = table name
-- Tags = indexed metadata (low cardinality)
-- Fields = actual values (not indexed)
-- Timestamp = automatic
-- Example: Store temperature readings
-- DON'T: use device_id as field (can't filter efficiently)
-- DO: use device_id as tag
INSERT temperature,device_id=sensor123,room=bedroom value=23.5
-- Query by tag (fast, uses index)
SELECT value FROM temperature
WHERE device_id='sensor123'
AND time > now() - 1h
-- Aggregate
SELECT MEAN(value) FROM temperature
WHERE room='bedroom'
AND time > now() - 24h
GROUP BY time(10m), device_id
Cardinality explosion (common pitfall):
# BAD: High cardinality tag
# 1M devices × 1000 rooms = 1B tag combinations → slow
influx.write_points([{
"measurement": "temperature",
"tags": {
"device_id": "sensor12345", # 1M unique values
"room": "bedroom_floor2_apt501" # 1000 unique values
},
"fields": {"value": 23.5}
}])
# GOOD: Keep tags low cardinality, move high-cardinality to fields
influx.write_points([{
"measurement": "temperature",
"tags": {
"building_id": "building_42", # 100 buildings
"floor": "2" # 20 floors
},
"fields": {
"device_id": "sensor12345", # Field, not indexed
"value": 23.5
}
}])
Write optimization (batching):
package ingestion
import (
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
type TelemetryWriter struct {
client influxdb2.Client
writeAPI api.WriteAPI
buffer []*write.Point
mu sync.Mutex
}
func NewTelemetryWriter(url, token, org, bucket string) *TelemetryWriter {
client := influxdb2.NewClient(url, token)
writeAPI := client.WriteAPI(org, bucket)
// Handle async errors
errorsCh := writeAPI.Errors()
go func() {
for err := range errorsCh {
log.Printf("InfluxDB write error: %v", err)
}
}()
return &TelemetryWriter{
client: client,
writeAPI: writeAPI,
buffer: make([]*write.Point, 0, 1000),
}
}
func (w *TelemetryWriter) WriteTelemetry(deviceID string, measurement string, value float64) {
p := influxdb2.NewPointWithMeasurement(measurement).
AddTag("device_id", deviceID).
AddField("value", value).
SetTime(time.Now())
// Async write (batched automatically by client)
w.writeAPI.WritePoint(p)
}
// For very high throughput, manual batching gives more control
func (w *TelemetryWriter) WriteBatch(points []*write.Point) error {
// Write in chunks of 5000 (InfluxDB recommended batch size)
const batchSize = 5000
for i := 0; i < len(points); i += batchSize {
end := i + batchSize
if end > len(points) {
end = len(points)
}
batch := points[i:end]
w.writeAPI.WritePoint(batch...)
}
// Flush to ensure all writes complete
w.writeAPI.Flush()
return nil
}
Retention & Downsampling
Problem: Raw data (1 sample/sec) = 86,400 points/day/device. 1M devices = 86B points/day → $$
Solution: Continuous queries for downsampling + retention policies.
-- Create retention policies
CREATE RETENTION POLICY "raw" ON "iot_db" DURATION 7d REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "downsampled_1h" ON "iot_db" DURATION 90d REPLICATION 1
CREATE RETENTION POLICY "downsampled_1d" ON "iot_db" DURATION 5y REPLICATION 1
-- Continuous query: auto-downsample raw data to 1h averages
CREATE CONTINUOUS QUERY "cq_downsample_1h" ON "iot_db"
BEGIN
SELECT MEAN(value) AS value
INTO "downsampled_1h"."temperature"
FROM "raw"."temperature"
GROUP BY time(1h), device_id
END
-- Continuous query: 1d averages
CREATE CONTINUOUS QUERY "cq_downsample_1d" ON "iot_db"
BEGIN
SELECT MEAN(value) AS value
INTO "downsampled_1d"."temperature"
FROM "downsampled_1h"."temperature"
GROUP BY time(1d), device_id
END
Storage savings:
Raw (7 days): 1M devices × 86,400 points/day × 7d = 604B points
1h downsampled (90d): 1M devices × 24 points/day × 90d = 2.16B points
1d downsampled (5y): 1M devices × 1 point/day × 1825d = 1.83B points
──────────────────────────────────────────────────────────────────────
Total: 608B points (vs 31T points if storing raw for 5 years)
TimescaleDB Alternative
Postgres extension (better for relational queries):
-- Create hypertable (auto-partitioning by time)
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
metric TEXT NOT NULL,
value DOUBLE PRECISION
);
SELECT create_hypertable('telemetry', 'time');
-- Create index on device_id (for device-specific queries)
CREATE INDEX ON telemetry (device_id, time DESC);
-- Compression (10x reduction typical)
ALTER TABLE telemetry SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = 'time DESC'
);
-- Auto-compress chunks older than 7 days
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
-- Continuous aggregate (like InfluxDB CQ)
CREATE MATERIALIZED VIEW telemetry_1h
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket('1 hour', time) AS bucket,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM telemetry
GROUP BY device_id, bucket;
-- Refresh policy
SELECT add_continuous_aggregate_policy('telemetry_1h',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
Query performance:
-- Query raw data (last hour)
EXPLAIN ANALYZE
SELECT time, value
FROM telemetry
WHERE device_id = 'sensor123'
AND time > NOW() - INTERVAL '1 hour';
-- Planning Time: 0.5ms
-- Execution Time: 12ms (3600 points)
-- Query aggregated data (last month)
EXPLAIN ANALYZE
SELECT bucket, avg_value
FROM telemetry_1h
WHERE device_id = 'sensor123'
AND bucket > NOW() - INTERVAL '30 days';
-- Planning Time: 0.3ms
-- Execution Time: 3ms (720 points, pre-aggregated)
Edge Computing
Edge Gateway Architecture
Why edge: Reduce bandwidth, lower latency, work offline.
Cloud
▲
│ (aggregated data)
┌──────────┴──────────┐
│ │
Edge Gateway Edge Gateway
(Raspberry Pi) (Industrial PC)
│ │
┌────┴────┐ ┌────┴────┐
│ │ │ │
[Sensor] [Sensor] [PLC] [Camera]
(BLE) (Zigbee) (Modbus) (RTSP)
Edge processing pipeline:
# Running on edge gateway (e.g., Raspberry Pi 4)
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from collections import deque
import numpy as np
class EdgeTelemetryProcessor:
def __init__(self):
self.client = IoTHubModuleClient.create_from_edge_environment()
self.buffer = deque(maxlen=1000) # Local buffer
self.anomaly_detector = AnomalyDetector()
async def process_sensor_reading(self, device_id, value):
# Step 1: Local buffering (survive network outage)
self.buffer.append({
'device_id': device_id,
'value': value,
'timestamp': time.time()
})
# Step 2: Local anomaly detection
is_anomaly = self.anomaly_detector.detect(device_id, value)
if is_anomaly:
# Critical: send to cloud immediately
await self.send_to_cloud({
'device_id': device_id,
'value': value,
'alert': 'anomaly_detected',
'priority': 'high'
})
else:
# Normal: aggregate and batch
# Only send summary every 5 minutes
pass
async def flush_aggregates(self):
"""Send aggregated data to cloud every 5 minutes"""
while True:
await asyncio.sleep(300) # 5 minutes
# Aggregate by device
aggregates = {}
for reading in list(self.buffer):
device_id = reading['device_id']
if device_id not in aggregates:
aggregates[device_id] = []
aggregates[device_id].append(reading['value'])
# Send summary
for device_id, values in aggregates.items():
summary = {
'device_id': device_id,
'count': len(values),
'mean': np.mean(values),
'min': np.min(values),
'max': np.max(values),
'std': np.std(values)
}
await self.send_to_cloud(summary)
self.buffer.clear()
async def send_to_cloud(self, data):
try:
msg = Message(json.dumps(data))
await self.client.send_message_to_output(msg, "telemetryOutput")
except Exception as e:
# Network error, keep in buffer
log.error(f"Failed to send to cloud: {e}")
Bandwidth savings:
Without edge aggregation:
100 sensors × 1 sample/sec × 60 sec × 100 bytes = 600 KB/min → 864 MB/day
With edge aggregation:
100 sensors × 1 summary/5min × 200 bytes = 4 KB/5min → 1.15 MB/day
Savings: 99.87% bandwidth reduction
Edge-Cloud Sync
Azure IoT Edge (deployment.json):
{
"modulesContent": {
"$edgeAgent": {
"properties.desired": {
"modules": {
"telemetryProcessor": {
"version": "1.0",
"type": "docker",
"status": "running",
"restartPolicy": "always",
"settings": {
"image": "acr.io/telemetry-processor:latest",
"createOptions": {
"HostConfig": {
"Binds": ["/data:/data"]
}
}
}
},
"localDatabase": {
"version": "1.0",
"type": "docker",
"status": "running",
"settings": {
"image": "timescale/timescaledb:latest"
}
}
}
}
},
"$edgeHub": {
"properties.desired": {
"routes": {
"sensorToProcessor": "FROM /messages/modules/sensorModule/* INTO BrokeredEndpoint(\"/modules/telemetryProcessor/inputs/sensorInput\")",
"processorToCloud": "FROM /messages/modules/telemetryProcessor/outputs/telemetryOutput INTO $upstream"
},
"storeAndForwardConfiguration": {
"timeToLiveSecs": 604800
}
}
}
}
}
Offline-first design:
package edge
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
type OfflineBuffer struct {
db *sql.DB
}
func NewOfflineBuffer() *OfflineBuffer {
db, _ := sql.Open("sqlite3", "/data/offline_buffer.db")
// Create table for buffered messages
db.Exec(`CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER,
payload BLOB,
retry_count INTEGER DEFAULT 0
)`)
return &OfflineBuffer{db: db}
}
func (b *OfflineBuffer) Store(payload []byte) error {
_, err := b.db.Exec(
"INSERT INTO messages (timestamp, payload) VALUES (?, ?)",
time.Now().Unix(), payload,
)
return err
}
func (b *OfflineBuffer) Flush(cloudClient CloudClient) error {
rows, err := b.db.Query(
"SELECT id, payload FROM messages ORDER BY timestamp LIMIT 100",
)
if err != nil {
return err
}
defer rows.Close()
var successIDs []int64
for rows.Next() {
var id int64
var payload []byte
rows.Scan(&id, &payload)
err := cloudClient.Send(payload)
if err == nil {
successIDs = append(successIDs, id)
} else {
// Increment retry count
b.db.Exec("UPDATE messages SET retry_count = retry_count + 1 WHERE id = ?", id)
}
}
// Delete successfully sent messages
if len(successIDs) > 0 {
placeholders := strings.Repeat("?,", len(successIDs)-1) + "?"
query := fmt.Sprintf("DELETE FROM messages WHERE id IN (%s)", placeholders)
args := make([]interface{}, len(successIDs))
for i, id := range successIDs {
args[i] = id
}
b.db.Exec(query, args...)
}
return nil
}
Device Twins / Shadows
Concept: Server-side representation of device state, accessible even when device offline.
Device (actual state) Cloud (desired state)
───────────────────── ─────────────────────
{ {
"firmware": "1.2.0", "firmware": "1.3.0", ← Cloud wants upgrade
"config": { "config": {
"interval": 60 "interval": 30 ← Cloud wants faster reports
} }
} }
Device syncs:
1. Receives desired state from cloud
2. Applies changes (upgrade firmware, update config)
3. Reports new actual state
AWS IoT Device Shadow
Shadow document structure:
{
"state": {
"desired": {
"temperature_threshold": 25,
"reporting_interval": 30,
"led_brightness": 80
},
"reported": {
"temperature_threshold": 20,
"reporting_interval": 60,
"led_brightness": 50,
"firmware_version": "1.2.0",
"battery": 75
}
},
"metadata": {
"desired": {
"temperature_threshold": {
"timestamp": 1678901234
}
},
"reported": {
"battery": {
"timestamp": 1678901200
}
}
},
"version": 42,
"timestamp": 1678901234
}
Device-side implementation (Python):
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient
class DeviceController:
def __init__(self, thing_name):
self.shadow_client = AWSIoTMQTTShadowClient("device123")
self.shadow_client.configureEndpoint("abc123.iot.us-west-2.amazonaws.com", 8883)
self.shadow_client.configureCredentials(
"root-ca.pem", "private.key", "certificate.pem"
)
self.shadow_client.connect()
self.device_shadow = self.shadow_client.createShadowHandlerWithName(thing_name, True)
# Listen for desired state changes
self.device_shadow.shadowRegisterDeltaCallback(self.on_delta)
# Initial sync
self.device_shadow.shadowGet(self.on_shadow, 5)
def on_delta(self, payload, responseStatus, token):
"""Called when cloud updates desired state"""
delta = json.loads(payload)
print(f"Received delta: {delta}")
# Apply changes
if "temperature_threshold" in delta["state"]:
new_threshold = delta["state"]["temperature_threshold"]
self.update_config("temp_threshold", new_threshold)
if "reporting_interval" in delta["state"]:
new_interval = delta["state"]["reporting_interval"]
self.update_config("interval", new_interval)
# Report back new state
self.report_state({
"temperature_threshold": new_threshold,
"reporting_interval": new_interval
})
def report_state(self, state):
"""Report current device state to cloud"""
payload = {
"state": {
"reported": state
}
}
self.device_shadow.shadowUpdate(json.dumps(payload), self.on_shadow_update, 5)
def on_shadow_update(self, payload, responseStatus, token):
if responseStatus == "accepted":
print("Shadow updated successfully")
else:
print(f"Shadow update failed: {responseStatus}")
Cloud-side: Update desired state (Go):
package shadow
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/iotdataplane"
)
type ShadowService struct {
iot *iotdataplane.IoTDataPlane
}
func (s *ShadowService) UpdateDesiredState(thingName string, desired map[string]interface{}) error {
payload := map[string]interface{}{
"state": map[string]interface{}{
"desired": desired,
},
}
jsonPayload, _ := json.Marshal(payload)
_, err := s.iot.UpdateThingShadow(&iotdataplane.UpdateThingShadowInput{
ThingName: aws.String(thingName),
Payload: jsonPayload,
})
return err
}
// Example: Remotely configure all devices in a building
func (s *ShadowService) ConfigureBuilding(buildingID string, config map[string]interface{}) error {
// Get all devices in building
devices, err := s.registry.GetDevicesByBuilding(buildingID)
if err != nil {
return err
}
// Update shadows in parallel
var wg sync.WaitGroup
errCh := make(chan error, len(devices))
for _, device := range devices {
wg.Add(1)
go func(deviceID string) {
defer wg.Done()
err := s.UpdateDesiredState(deviceID, config)
if err != nil {
errCh <- err
}
}(device.ID)
}
wg.Wait()
close(errCh)
// Check for errors
var errors []error
for err := range errCh {
errors = append(errors, err)
}
if len(errors) > 0 {
return fmt.Errorf("failed to update %d devices", len(errors))
}
return nil
}
Conflict Resolution
Problem: Device offline for 1 hour, during which cloud updates desired state 10 times. Device comes back online.
Strategy 1: Last-write-wins (LWW)
def resolve_conflict(reported, desired, metadata):
# Compare timestamps
if metadata["reported"]["timestamp"] > metadata["desired"]["timestamp"]:
# Device state is newer, keep reported
return reported
else:
# Cloud state is newer, apply desired
return desired
Strategy 2: Merge (for non-conflicting keys)
def merge_states(reported, desired):
merged = reported.copy()
for key, value in desired.items():
if key not in reported:
# New key from cloud, apply it
merged[key] = value
elif reported[key] != value:
# Conflict, cloud wins
merged[key] = value
return merged
Strategy 3: Vector clocks (advanced)
type VectorClock map[string]int
func (vc VectorClock) HappensBefore(other VectorClock) bool {
lessOrEqual := true
strictlyLess := false
for node, timestamp := range vc {
otherTimestamp := other[node]
if timestamp > otherTimestamp {
return false // Not happened before
}
if timestamp < otherTimestamp {
strictlyLess = true
}
}
return lessOrEqual && strictlyLess
}
// Usage in shadow
type DeviceShadow struct {
State map[string]interface{}
VectorClock VectorClock
}
func (s *DeviceShadow) Merge(other *DeviceShadow) {
if s.VectorClock.HappensBefore(other.VectorClock) {
// Other is newer, replace
s.State = other.State
s.VectorClock = other.VectorClock
} else if other.VectorClock.HappensBefore(s.VectorClock) {
// Self is newer, keep current
return
} else {
// Concurrent updates, manual merge required
s.State = mergeConflict(s.State, other.State)
s.VectorClock = mergeVectorClocks(s.VectorClock, other.VectorClock)
}
}
Security
Device Authentication
Certificate-based (X.509) - industry standard:
┌──────────────┐
│ Root CA │ (offline, stored in HSM)
└──────┬───────┘
│
┌──────▼───────┐
│ Intermediate │ (signs device certs)
│ CA │
└──────┬───────┘
│
├──────────┬──────────┬──────────┐
▼ ▼ ▼ ▼
[Device A] [Device B] [Device C] [Device D]
(unique (unique (unique (unique
cert) cert) cert) cert)
Certificate generation (offline, during manufacturing):
#!/bin/bash
# Generate device certificate
DEVICE_ID=$1
# Generate private key (stays on device, never transmitted)
openssl genrsa -out ${DEVICE_ID}.key 2048
# Generate CSR (Certificate Signing Request)
openssl req -new -key ${DEVICE_ID}.key -out ${DEVICE_ID}.csr \
-subj "/CN=${DEVICE_ID}/O=MyCompany/C=US"
# Sign with intermediate CA
openssl x509 -req -in ${DEVICE_ID}.csr \
-CA intermediate-ca.crt -CAkey intermediate-ca.key \
-CAcreateserial -out ${DEVICE_ID}.crt \
-days 3650 -sha256
# Embed cert + root CA in device firmware
cat ${DEVICE_ID}.crt intermediate-ca.crt root-ca.crt > ${DEVICE_ID}-chain.pem
Device-side TLS connection (C):
#include <mbedtls/ssl.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
int connect_to_broker() {
mbedtls_ssl_context ssl;
mbedtls_ssl_config conf;
mbedtls_x509_crt cacert, clicert;
mbedtls_pk_context pkey;
// Load device certificate and private key
mbedtls_x509_crt_parse_file(&clicert, "/certs/device.crt");
mbedtls_pk_parse_keyfile(&pkey, "/certs/device.key", NULL);
// Load root CA
mbedtls_x509_crt_parse_file(&cacert, "/certs/root-ca.crt");
// Configure SSL
mbedtls_ssl_config_defaults(&conf,
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT);
mbedtls_ssl_conf_ca_chain(&conf, &cacert, NULL);
mbedtls_ssl_conf_own_cert(&conf, &clicert, &pkey);
// Verify server certificate
mbedtls_ssl_conf_authmode(&conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// Connect
mbedtls_net_context server_fd;
mbedtls_net_connect(&server_fd, "mqtt.example.com", "8883", MBEDTLS_NET_PROTO_TCP);
mbedtls_ssl_setup(&ssl, &conf);
mbedtls_ssl_set_bio(&ssl, &server_fd, mbedtls_net_send, mbedtls_net_recv, NULL);
// TLS handshake
int ret = mbedtls_ssl_handshake(&ssl);
if (ret != 0) {
printf("TLS handshake failed: -0x%x\n", -ret);
return -1;
}
// Verify peer certificate
uint32_t flags = mbedtls_ssl_get_verify_result(&ssl);
if (flags != 0) {
printf("Certificate verification failed\n");
return -1;
}
printf("TLS connection established\n");
return 0;
}
Secure Boot
Purpose: Prevent malicious firmware from running.
┌────────────────────────────────────────┐
│ Secure Boot Chain │
├────────────────────────────────────────┤
│ 1. ROM Bootloader (immutable) │
│ - Verify Stage 2 bootloader sig │
│ - Public key burned in OTP fuses │
├────────────────────────────────────────┤
│ 2. Stage 2 Bootloader │
│ - Verify app firmware signature │
│ - Check rollback protection │
├────────────────────────────────────────┤
│ 3. Application Firmware │
│ - Run only if signature valid │
└────────────────────────────────────────┘
ESP32 secure boot (example):
// Bootloader verification (in Stage 2)
#include "esp_secure_boot.h"
esp_err_t verify_and_boot() {
const esp_partition_t *partition = esp_partition_find_first(
ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
// Read app image
esp_image_metadata_t metadata;
esp_err_t err = esp_image_verify(ESP_IMAGE_VERIFY,
partition->address,
&metadata);
if (err != ESP_OK) {
// Signature verification failed
ESP_LOGE("boot", "Invalid firmware signature");
// Try backup partition
partition = esp_partition_find_first(
ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_APP_OTA_1, NULL);
err = esp_image_verify(ESP_IMAGE_VERIFY,
partition->address,
&metadata);
if (err != ESP_OK) {
// Both partitions invalid, halt
esp_restart();
}
}
// Check rollback version (prevent downgrade attacks)
if (metadata.secure_version < get_secure_version()) {
ESP_LOGE("boot", "Firmware version too old (rollback protection)");
esp_restart();
}
// Boot into verified app
esp_image_load(partition->address);
return ESP_OK;
}
Encryption at Rest
Problem: Device stolen, attacker extracts flash memory.
Solution: Flash encryption (hardware-accelerated):
// Enable flash encryption (ESP32)
esp_err_t enable_flash_encryption() {
// Generate 256-bit AES key in eFuse (one-time programmable)
esp_flash_encryption_init();
// Encrypt all partitions
const esp_partition_t *partition = esp_partition_find_first(
ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
esp_flash_encrypt_region(partition->address, partition->size);
// Lock eFuse (key cannot be read out)
esp_efuse_write_key(EFUSE_BLK_KEY0, ESP_EFUSE_KEY_PURPOSE_FLASH_ENCRYPTION);
return ESP_OK;
}
// Data is automatically decrypted on read, encrypted on write
// Attacker reading raw flash gets only encrypted data
Cloud-side: Encryption at rest (AWS IoT):
// Store device credentials encrypted
import (
"github.com/aws/aws-sdk-go/service/kms"
)
func storeDeviceSecret(deviceID, secret string) error {
// Encrypt with KMS
kmsClient := kms.New(session.Must(session.NewSession()))
encrypted, err := kmsClient.Encrypt(&kms.EncryptInput{
KeyId: aws.String("arn:aws:kms:us-west-2:123456789:key/abcd-1234"),
Plaintext: []byte(secret),
EncryptionContext: map[string]*string{
"device_id": aws.String(deviceID),
},
})
if err != nil {
return err
}
// Store encrypted blob in DynamoDB
return dynamoDB.PutItem(&dynamodb.PutItemInput{
TableName: aws.String("device_secrets"),
Item: map[string]*dynamodb.AttributeValue{
"device_id": {S: aws.String(deviceID)},
"encrypted_secret": {B: encrypted.CiphertextBlob},
},
})
}
Analytics & Anomaly Detection
Stream Processing for Real-time Alerts
Apache Flink pipeline:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Read from Kafka
FlinkKafkaConsumer<TelemetryEvent> consumer = new FlinkKafkaConsumer<>(
"iot.telemetry",
new TelemetrySchema(),
properties
);
DataStream<TelemetryEvent> telemetry = env.addSource(consumer);
// Detect temperature anomalies
DataStream<Alert> alerts = telemetry
.keyBy(event -> event.getDeviceId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new AnomalyDetector());
// Send alerts
alerts.addSink(new AlertSink());
env.execute("IoT Anomaly Detection");
Anomaly detector (statistical approach):
public class AnomalyDetector extends ProcessWindowFunction<TelemetryEvent, Alert, String, TimeWindow> {
@Override
public void process(String deviceId, Context context, Iterable<TelemetryEvent> events, Collector<Alert> out) {
List<Double> values = new ArrayList<>();
for (TelemetryEvent event : events) {
values.add(event.getValue());
}
// Calculate statistics
double mean = calculateMean(values);
double stdDev = calculateStdDev(values, mean);
// Check for outliers (Z-score > 3)
for (TelemetryEvent event : events) {
double zScore = Math.abs((event.getValue() - mean) / stdDev);
if (zScore > 3.0) {
out.collect(new Alert(
deviceId,
event.getTimestamp(),
event.getValue(),
"Statistical anomaly detected (Z-score: " + zScore + ")"
));
}
}
}
}
Machine Learning for Predictive Maintenance
Use case: Dự đoán motor failure dựa trên vibration patterns.
Training pipeline (Python + TensorFlow):
import tensorflow as tf
from tensorflow import keras
import pandas as pd
import numpy as np
# Load historical data (labeled: normal vs failure)
df = pd.read_csv('motor_vibration_data.csv')
# Columns: device_id, timestamp, vibration_x, vibration_y, vibration_z, temperature, label
# Feature engineering
def extract_features(window):
"""Extract features from 1-minute window"""
return {
'mean_vibration': np.mean(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
'std_vibration': np.std(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
'max_vibration': np.max(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
'fft_peak': get_fft_peak_frequency(window['vibration_x'].values),
'temperature': np.mean(window['temperature'].values),
}
# Build LSTM model
model = keras.Sequential([
keras.layers.LSTM(64, input_shape=(60, 5), return_sequences=True), # 60 timesteps
keras.layers.Dropout(0.2),
keras.layers.LSTM(32),
keras.layers.Dropout(0.2),
keras.layers.Dense(16, activation='relu'),
keras.layers.Dense(1, activation='sigmoid') # Failure probability
])
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy', keras.metrics.AUC()])
# Train
history = model.fit(X_train, y_train,
epochs=50,
batch_size=32,
validation_data=(X_val, y_val))
# Export model
model.save('motor_failure_predictor.h5')
Inference at edge (TensorFlow Lite for microcontrollers):
# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Deploy to edge device (C++)
# ...
Or inference in cloud (real-time):
from kafka import KafkaConsumer
import tensorflow as tf
model = tf.keras.models.load_model('motor_failure_predictor.h5')
consumer = KafkaConsumer('iot.telemetry', bootstrap_servers='localhost:9092')
for message in consumer:
telemetry = json.loads(message.value)
# Buffering: wait for 60 samples (1 minute @ 1Hz)
buffer.append(telemetry)
if len(buffer) < 60:
continue
# Extract features
features = extract_features(buffer)
X = np.array([features]).reshape(1, 60, 5)
# Predict
failure_probability = model.predict(X)[0][0]
if failure_probability > 0.8:
# High risk, send alert
send_alert(telemetry['device_id'], f"Predicted failure risk: {failure_probability:.2%}")
buffer = buffer[1:] # Sliding window
Scalability & Cost Optimization
Sharding Strategies
Problem: 10M devices, single MQTT broker can't handle.
Solution: Shard by device ID.
┌─────────────────────────────────────────────────────┐
│ Load Balancer (HAProxy) │
│ - Read device_id from MQTT CONNECT packet │
│ - Hash(device_id) % num_shards → shard │
└────────────┬──────────────┬─────────────┬──────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Shard 1 │ │ Shard 2 │ │ Shard 3 │
│ (VerneMQ)│ │ (VerneMQ)│ │ (VerneMQ)│
└──────────┘ └──────────┘ └──────────┘
device_id device_id device_id
% 3 == 0 % 3 == 1 % 3 == 2
HAProxy config:
global
log /dev/log local0
maxconn 1000000
frontend mqtt_front
bind *:1883
mode tcp
option tcplog
# Extract device_id from MQTT CONNECT
tcp-request inspect-delay 5s
tcp-request content accept if { req.len gt 0 }
# Hash-based backend selection
use_backend shard_%[req.payload(0,0),mqtt_field_value(client_id),mod(3)]
backend shard_0
mode tcp
balance leastconn
server mqtt1 10.0.1.10:1883 check
backend shard_1
mode tcp
balance leastconn
server mqtt2 10.0.1.11:1883 check
backend shard_2
mode tcp
balance leastconn
server mqtt3 10.0.1.12:1883 check
Resharding (inevitable as you scale):
# Gradual resharding to minimize disruption
class ReshardController:
def __init__(self, old_shards, new_shards):
self.old_shards = old_shards
self.new_shards = new_shards
self.migration_percentage = 0
def get_shard(self, device_id):
# During migration, probabilistically route to new shards
if random.random() < self.migration_percentage:
return hash(device_id) % self.new_shards
else:
return hash(device_id) % self.old_shards
def increase_migration(self, percentage):
"""Gradually increase traffic to new shards"""
self.migration_percentage = min(1.0, percentage)
# Update load balancer config
self.update_haproxy_config()
# Migration plan:
# Day 1: 0% → new shards (deploy new shards)
# Day 2: 10% → new shards
# Day 3: 30% → new shards
# Day 4: 50% → new shards
# Day 5: 100% → new shards (decommission old shards)
Cold Storage & Data Lifecycle
Tiered storage:
Hot Tier (0-7 days): InfluxDB (SSD) - Full-resolution
Warm Tier (7-90 days): S3 + Athena - 1h aggregates
Cold Tier (90d-5y): S3 Glacier - 1d aggregates
Archive (>5y): Delete or tape backup - Regulatory only
Auto-archival job:
package archival
import (
"github.com/aws/aws-sdk-go/service/s3"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
type ArchivalJob struct {
influx influxdb2.Client
s3 *s3.S3
}
func (j *ArchivalJob) ArchiveOldData() error {
// Query data older than 7 days
query := `
from(bucket: "iot_telemetry")
|> range(start: -14d, stop: -7d)
|> aggregateWindow(every: 1h, fn: mean)
`
result, err := j.influx.QueryAPI("myorg").Query(context.Background(), query)
if err != nil {
return err
}
// Write to Parquet file
parquetFile := createParquetWriter("telemetry_archive.parquet")
for result.Next() {
record := result.Record()
parquetFile.Write(record)
}
parquetFile.Close()
// Upload to S3
file, _ := os.Open("telemetry_archive.parquet")
_, err = j.s3.PutObject(&s3.PutObjectInput{
Bucket: aws.String("iot-archive"),
Key: aws.String(fmt.Sprintf("year=%d/month=%d/telemetry.parquet",
time.Now().Year(), time.Now().Month())),
Body: file,
StorageClass: aws.String("GLACIER"), // Cheap long-term storage
})
// Delete from InfluxDB
if err == nil {
deleteQuery := `
from(bucket: "iot_telemetry")
|> range(start: -14d, stop: -7d)
|> delete()
`
j.influx.QueryAPI("myorg").Query(context.Background(), deleteQuery)
}
return err
}
Query archived data (Athena):
-- Create external table on S3
CREATE EXTERNAL TABLE telemetry_archive (
device_id STRING,
timestamp BIGINT,
value DOUBLE
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION 's3://iot-archive/';
-- Query (billed by data scanned, cheap if partitioned well)
SELECT device_id, AVG(value) as avg_value
FROM telemetry_archive
WHERE year = 2025 AND month = 3
AND device_id = 'sensor123'
GROUP BY device_id;
Cost Optimization Techniques
1. Protocol efficiency:
HTTP POST: ~500 bytes overhead (headers)
MQTT Publish: ~2 bytes overhead (minimal header)
1M devices × 1 msg/min × 500 bytes × 60 min × 24h × 30d = 21.6 TB/month (HTTP)
1M devices × 1 msg/min × 2 bytes × 60 min × 24h × 30d = 86 GB/month (MQTT)
Data transfer cost (AWS): $0.09/GB
HTTP: $1,944/month
MQTT: $7.74/month
Savings: 99.6%
2. Compression:
import zlib
import json
# Raw telemetry
data = {
"device_id": "sensor123",
"timestamp": 1678901234,
"temperature": 23.5,
"humidity": 65.2,
"pressure": 1013.25
}
raw = json.dumps(data).encode()
print(f"Raw: {len(raw)} bytes") # ~120 bytes
# Compressed
compressed = zlib.compress(raw)
print(f"Compressed: {len(compressed)} bytes") # ~80 bytes
print(f"Ratio: {len(compressed)/len(raw):.1%}") # 66.7%
# For time-series, binary formats even better
import struct
binary = struct.pack('>Qfff',
data['timestamp'],
data['temperature'],
data['humidity'],
data['pressure'])
print(f"Binary: {len(binary)} bytes") # 20 bytes → 83% reduction
3. Batching:
# Instead of sending 60 individual messages (1/min for 1 hour)
# Send 1 batch message every hour
# Individual messages
for i in range(60):
client.publish(topic, json.dumps({"value": values[i]}))
# Cost: 60 × $0.000001/msg = $0.00006
# Batched
batch = {"values": values, "interval": 60}
client.publish(topic, json.dumps(batch))
# Cost: 1 × $0.000001/msg = $0.000001
# Savings: 98.3%
4. Sampling:
# Adaptive sampling: send more data when changing, less when stable
class AdaptiveSampler:
def __init__(self, threshold=0.5):
self.last_value = None
self.threshold = threshold
def should_send(self, value):
if self.last_value is None:
self.last_value = value
return True
# Send if change > threshold
if abs(value - self.last_value) > self.threshold:
self.last_value = value
return True
return False
# Example: Temperature stable at 23°C for 1 hour
sampler = AdaptiveSampler(threshold=0.5)
for temp in [23.0, 23.1, 23.0, 23.2, ..., 23.1]: # 60 readings
if sampler.should_send(temp):
client.publish("temp", str(temp))
# Result: Only 2-3 messages sent (when temp changes > 0.5°C)
# Instead of 60 messages
# Savings: 95%
Interview Questions
Q1: Design an IoT platform for 10 million smart thermostats
Requirements:
- 10M devices, each sends temperature every 60 seconds
- Users can view current temp and historical graphs
- Users can set target temperature remotely
- 99.9% uptime
Calculation:
Traffic:
- Telemetry: 10M devices × 1 msg/60s = 166k msg/sec
- Commands: ~1k/sec (users adjusting temp)
- Queries: ~10k/sec (users viewing dashboards)
Data:
- Per message: 50 bytes (device_id, timestamp, temp)
- Per day: 166k msg/sec × 50 bytes × 86400 sec = 718 GB/day
- Per year: 262 TB/year
Architecture:
┌────────────────────────────────────────────────────────────┐
│ Devices (10M) │
└─────────────────────┬──────────────────────────────────────┘
│ MQTT/TLS
▼
┌─────────────────────────────────────────────────────────────┐
│ MQTT Broker Cluster (VerneMQ, 100 nodes, sharded) │
│ - 100k connections/node │
│ - QoS 0 for telemetry (fire-and-forget) │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Kafka (50 partitions, 3x replication) │
│ - Topic: telemetry (retention: 7 days) │
│ - Topic: commands (retention: 30 days) │
└──────┬──────────────────────┬───────────────────────────────┘
│ │
▼ ▼
┌─────────────┐ ┌────────────────────────┐
│ Stream │ │ Device Shadow Service│
│ Processor │ │ (Redis, device state)│
│ (Flink) │ └───────────┬────────────┘
│ - Anomaly │ │
│ - Alerts │ │
└──────┬──────┘ │
│ │
▼ ▼
┌────────────────┐ ┌─────────────────┐
│ InfluxDB │ │ PostgreSQL │
│ (time-series) │ │ (metadata, │
│ - Raw: 7d │ │ users, auth) │
│ - 1h: 90d │ └─────────────────┘
│ - 1d: 5y │
└────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ API Gateway (GraphQL/REST) │
└─────────────────────────────────────────────────────────────┘
│
▼
[Web/Mobile App]
Key decisions:
- MQTT QoS 0: Temperature readings, one lost sample không critical
- Sharding: 100 MQTT brokers × 100k connections = 10M
- Kafka: Decouple ingestion từ processing, replay-able
- Redis for shadows: Fast reads (current temp), TTL for offline devices
- InfluxDB downsampling: Raw data expensive, downsample sau 7 days
Bottlenecks & scaling:
- MQTT broker: Horizontal scaling với sharding
- Kafka: Increase partitions (50 → 100)
- InfluxDB: Clustering (Enterprise) hoặc vào TimescaleDB
- API: Stateless, scale horizontally
Q2: How do you handle firmware OTA for 1M devices without bricking them?
Strategy:
1. Canary rollout (1% → 10% → 100%)
2. Health checks (post-update boot verification)
3. Automatic rollback (bootloader dual-partition)
4. Gradual rollout with kill switch
Implementation:
class OTAController:
def rollout_firmware(self, version, target_devices):
# Phase 1: Canary (1%, 10,000 devices)
canary_devices = sample(target_devices, 10000)
self.update_devices(canary_devices, version)
# Monitor for 24 hours
time.sleep(86400)
# Check failure rate
failures = self.get_failure_count(version)
failure_rate = failures / 10000
if failure_rate > 0.05: # 5% failure threshold
# Abort rollout
self.pause_rollout(version)
self.alert_oncall(f"High failure rate: {failure_rate:.1%}")
return
# Phase 2: Staged (10%, 100k devices)
staged_devices = sample(target_devices, 100000)
self.update_devices(staged_devices, version)
time.sleep(43200) # Monitor for 12 hours
failures = self.get_failure_count(version)
failure_rate = failures / 100000
if failure_rate > 0.02:
self.pause_rollout(version)
return
# Phase 3: General availability (remaining 890k devices)
# Gradual rollout over 7 days to avoid thundering herd
remaining = list(set(target_devices) - set(staged_devices))
for day in range(7):
batch = remaining[day * 127142 : (day+1) * 127142]
self.update_devices(batch, version)
time.sleep(86400)
Device-side health check:
void app_main() {
// Check if this is first boot after OTA
esp_ota_img_states_t state;
const esp_partition_t *running = esp_ota_get_running_partition();
esp_ota_get_state_partition(running, &state);
if (state == ESP_OTA_IMG_PENDING_VERIFY) {
// Perform health checks
bool wifi_ok = test_wifi_connection();
bool mqtt_ok = test_mqtt_connection();
bool sensors_ok = test_all_sensors();
if (wifi_ok && mqtt_ok && sensors_ok) {
// Mark firmware as valid
esp_ota_mark_app_valid_cancel_rollback();
report_ota_success();
} else {
// Health check failed, reboot to rollback
ESP_LOGE("OTA", "Health check failed, rolling back");
esp_restart(); // Bootloader will boot previous partition
}
}
// Normal operation...
}
Q3: Optimize cost: 1M devices sending 1 msg/min costs $50k/month. Reduce to $5k.
Current cost breakdown:
Assumptions:
- 1M devices × 60 msg/hour × 24h = 1.44B msg/day
- MQTT broker: AWS IoT Core = $1 per 1M messages
- Data transfer: $0.09/GB
- Storage: InfluxDB Cloud = $0.50/GB-month
Costs:
- Messages: 1.44B msg/day × 30d × $1/1M = $43,200/month
- Data transfer: 1.44B msg × 100 bytes × $0.09/GB = $12,960/month
- Storage: 4.32 TB × $0.50 = $2,160/month
Total: $58,320/month
Optimization 1: Edge aggregation (reduce messages by 60x)
# Instead of 60 individual messages/hour, send 1 aggregate
class EdgeAggregator:
def __init__(self):
self.buffer = []
def add_reading(self, value):
self.buffer.append(value)
if len(self.buffer) >= 60: # 1 hour of data
self.send_aggregate()
def send_aggregate(self):
msg = {
"count": len(self.buffer),
"mean": np.mean(self.buffer),
"min": np.min(self.buffer),
"max": np.max(self.buffer)
}
client.publish("telemetry", json.dumps(msg))
self.buffer = []
# New cost:
# Messages: 1.44B / 60 = 24M msg/day × 30d × $1/1M = $720/month
# Savings: $42,480/month
Optimization 2: Compression (reduce data transfer by 70%)
import zlib
compressed_msg = zlib.compress(json.dumps(msg).encode())
# New cost:
# Data transfer: $12,960 × 0.3 = $3,888/month
# Savings: $9,072/month
Optimization 3: Self-hosted MQTT broker (eliminate per-message cost)
Deploy VerneMQ on EC2:
- 10× c5.2xlarge instances ($0.34/hour × 10 × 730h = $2,482/month)
- Handle 1M concurrent connections
- Zero per-message cost
Savings: $43,200 - $2,482 = $40,718/month
Optimization 4: Downsampling & cold storage
-- Keep raw data for 7 days only
CREATE RETENTION POLICY "raw" ON "iot_db" DURATION 7d REPLICATION 1 DEFAULT
-- 1h aggregates for 90 days (60x size reduction)
-- 1d aggregates for 5 years (1440x size reduction)
-- New storage:
Raw (7d): 4.32 TB / 30 × 7 = 1.01 TB
1h (90d): 1.01 TB / 60 × 90/7 = 0.22 TB
1d (5y): 0.22 TB / 24 × 1825/90 = 0.19 TB
Total: 1.42 TB
Cost: 1.42 TB × $0.50 = $710/month
Savings: $1,450/month
Total optimized cost:
Messages: $720/month (edge aggregation)
Data transfer: $3,888/month (compression)
Broker: $2,482/month (self-hosted)
Storage: $710/month (downsampling)
────────────────────────────────────
Total: $7,800/month
Original: $58,320/month
Optimized: $7,800/month
Savings: 86.6% ($50,520/month)
Tóm tắt
Điểm quan trọng cần nhớ:
- Scale = Sharding: Không thể scale vertically, phải shard MQTT brokers, databases
- Constrained devices: Battery, bandwidth, memory → optimize protocols (MQTT > HTTP), compression, edge processing
- Reliability: QoS levels, device shadows, offline buffering, dual-partition OTA
- Security: Certificate-based auth, TLS, secure boot, flash encryption
- Cost: Edge aggregation, compression, batching, downsampling, cold storage
- Time-series DB: InfluxDB/TimescaleDB specialized cho IoT workloads
- Real-time: Stream processing (Flink/Kafka) cho alerts và analytics
- Edge computing: Reduce bandwidth (99%), lower latency, offline-capable
Common pitfalls:
- Cardinality explosion: Sử dụng high-cardinality values như tags trong InfluxDB
- Thundering herd: Tất cả devices reconnect cùng lúc sau network outage
- No rollback: OTA updates không có fallback → brick devices
- Stateful brokers: Mất connection state khi broker restart
- Ignoring offline: Devices sẽ offline, phải có offline buffering
Trade-offs to discuss in interviews:
| Decision | Option A | Option B | When to use A | When to use B |
|---|---|---|---|---|
| Protocol | MQTT | HTTP | Constrained devices, constant connection | Web-friendly, firewall traversal |
| QoS | 0 (at most once) | 1 (at least once) | Telemetry (lossy OK) | Commands (must arrive) |
| Storage | InfluxDB | TimescaleDB | Pure time-series | Need relational joins |
| Processing | Edge | Cloud | Low latency, bandwidth limited | Complex analytics, ML |
| Sharding | By device ID | By geography | Even distribution | Data locality |
Tài liệu tham khảo
Papers & specs:
Books:
- "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 11: Stream Processing)
- "Building the Internet of Things" - Maciej Kranz
- "IoT Inc" - Bruce Sinclair
Open-source projects:
- VerneMQ - Scalable MQTT broker
- Eclipse Mosquitto - Lightweight MQTT broker
- Apache IoTDB - Time-series database cho IoT
Blogs:
- HiveMQ Blog - MQTT deep dives
- InfluxData Blog - Time-series best practices
- AWS IoT Blog
Video courses:
Communities:
- MQTT Community
- r/IOT - Reddit IoT community
- IoT Stack Exchange