Architecting a High-Throughput Telemetry Pipeline in Go
When managing tens of thousands of distributed edge devices—such as retail self-checkout kiosks or industrial sensors—visibility into hardware health is mission-critical. However, standard REST/JSON architectures quickly become a bottleneck. The HTTP header overhead alone can exceed the actual telemetry payload by 10x, leading to bandwidth saturation and TCP connection exhaustion.
To solve this, I architected a distributed Telemetry Aggregator in Go. The system implements a Command Query Responsibility Segregation (CQRS) pattern, splitting the high-frequency write path from the low-latency read path.
Here is a technical breakdown of the ingestion, caching, and persistence layers.
1. The Ingestion Layer: WebSockets & Protobuf
To eliminate the overhead of establishing a new TCP handshake every five seconds, the edge agents stream metrics over persistent WebSockets. To further compress the network footprint, payloads are serialized using Protocol Buffers (Protobuf).
If a device loses power, the severed TCP socket triggers an immediate CloseHandler event, providing real-time hardware failure detection without relying on arbitrary polling timeouts.
2. Bounded Concurrency & Backpressure
A naive aggregator spawns a new Goroutine for every incoming packet. Under heavy load, this leads to unpredictable scheduling latency and database connection pool exhaustion.
To mitigate this, the WebSocket upgrader drops raw byte payloads into a buffered channel. A fixed pool of workers pulls from this channel. This establishes a strict backpressure mechanism. If the persistence layer slows down, the channel fills up, and the WebSocket server natively throttles ingestion to protect the system from cascading memory failures.
3. The Speed Layer: Atomic Lua & Redis ZSETs
To serve real-time dashboard queries, the system maintains a "Last 30 Minutes" sliding window in Redis. Updating this state normally requires fetching the array, updating it, and pushing it back—a race condition at high concurrency.
I bypassed global locks by pushing the eviction and appending logic directly to the Redis server via an atomic Lua script.
-- Execute atomically: Evict old records, add new heartbeat, and update the global registry
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
redis.call('ZADD', key, now, payload)
redis.call('EXPIRE', key, window)
-- Maintain an O(1) active machine discovery index
redis.call('ZADD', registry_key, now, machine_id)
redis.call('ZREMRANGEBYSCORE', registry_key, 0, now - window)
By utilizing Sorted Sets (ZSET) where the score is the Unix timestamp, pruning expired data (ZREMRANGEBYSCORE) becomes a lightning-fast O(log(N)) operation.
4. Cold Storage: PostgreSQL Bulk Copy
While Redis handles the real-time speed layer, long-term historical data must be persisted to a relational database. Executing an INSERT statement for every 5-second heartbeat would obliterate PostgreSQL's IOPS limits.
Instead, the workers hold a local buffer of parsed Protobuf metrics. When the buffer fills, or a specific time.Ticker interval is reached, the worker flushes the data using the native PostgreSQL COPY protocol.
func flushBatch(ctx context.Context, dbPool *pgxpool.Pool, batch[]*DataPoint) {
_, err := dbPool.CopyFrom(
ctx,
pgx.Identifier{"telemetry"},[]string{"machine_id", "measured_at", "cpu_usage", "memory_usage", "disk_usage"},
pgx.CopyFromSlice(len(batch), func(i int) ([]any, error) {
return[]any{batch[i].MachineId, batch[i].MeasuredAt.AsTime(), batch[i].CpuUsage, batch[i].MemoryUsage, batch[i].DiskUsage}, nil
}),
)
}
CopyFrom streams raw binary data directly to the table, bypassing the SQL parsing engine entirely and maximizing write throughput.
5. The API Layer: Struct Receivers & Context
The read model is decoupled into a dedicated API server. To ensure the API remains unit-testable and memory-safe, all handlers are implemented using the Struct Receiver pattern rather than relying on global state.
func (s *APIServer) handleStatus(w http.ResponseWriter, r *http.Request) {
// Relying on r.Context() ensures Redis queries instantly abort
// if the client disconnects before the request completes.
val, err := s.cache.ZRangeArgs(r.Context(), redis.ZRangeArgs{
Key: "telemetry:machine:{" + machineId + "}",
Start: 0,
Stop: -1,
Rev: true,
}).Result()
// ... unmarshal binary payload
// Utilize protojson for strict type-mapping and default value inclusion
marshaller := protojson.MarshalOptions{ EmitUnpopulated: true, UseProtoNames: true }
bytes, _ := marshaller.Marshal(dataPoint)
w.Write(bytes)
}
Infrastructure and Lifecycle
The entire cluster is orchestrated via Docker Compose. Crucially, the database schema is managed via Flyway migrations attached to a service_completed_successfully dependency lock. This guarantees the Go aggregator will never attempt to boot until the schema has been deterministically verified.
Finally, all Go binaries are strictly wired to syscall.SIGTERM. By passing a root context.Context throughout the worker pool, the orchestrator can trigger a graceful drain of all in-flight buffers prior to container termination, guaranteeing zero data loss during deployments.