This document describes the Server System in cursus, which is responsible for accepting TCP connections, managing concurrent client sessions, and routing commands to the appropriate subsystems.
The server implements a bounded worker pool pattern to handle connections efficiently and exposes multiple network ports for different purposes.
The server system consists of three primary components:
| Component | Port | Purpose | Protocol |
|---|---|---|---|
| Main Broker Server | 9000 (default) | Message publishing, topic management, consumption | TCP with length-prefixed framing |
| Health Check Server | 9080 (default) | Readiness/liveness probes | HTTP |
| Metrics Exporter | 9100 (default) | Prometheus metrics | HTTP |
The main broker server is implemented in RunServer and uses a fixed-size worker pool to handle connections concurrently. Each connection is processed by HandleConnection, which handles the message protocol, command detection, and routing.
| Component | Description |
|---|---|
| Metrics Server | If enabled, starts Prometheus exporter on cfg.ExporterPort. |
| TCP Listener | Creates TLS or plain TCP listener on cfg.BrokerPort. |
| Readiness Flag | Sets brokerReady to true to signal the health endpoint. |
| Health Check Server | Starts HTTP server for /health endpoint. |
| Worker Pool | Creates worker channel and spawns goroutines. |
| Accept Loop | Accepts connections and dispatches them to workers. |
maxWorkers = 1000: Maximum number of concurrent connection handlersreadDeadline = 5 * time.Minute: Connection read timeoutDefaultHealthCheckPort = 9080: Default health check portbrokerReady: Atomic boolean flag for readiness stateThe server uses a bounded worker pool pattern to limit concurrent connections and prevent resource exhaustion.
Worker Channel: workerCh := make(chan net.Conn, maxWorkers)
Worker Spawning: Loop creates 1000 goroutines
Benefits:
The HandleConnection function processes a single client connection through multiple request-response cycles.
Each message uses length-prefixed framing:
[4-byte length (big-endian)][message data]
The server uses a 4-byte big-endian length prefix for all messages:
binary.BigEndian.Uint32| Decode: Parse as topicName | payload format |
The isCommand function checks whether the incoming payload begins with a supported command keyword.
| Command | Purpose |
|---|---|
CREATE |
Create a new topic |
DELETE |
Delete a topic |
LIST |
List all topics |
SUBSCRIBE |
Subscribe to a topic |
PUBLISH |
Publish a message (explicit) |
CONSUME |
Consume messages from disk |
HELP |
Show help message |
If the payload does not start with a recognized command, it is treated as a regular message to be published to the target topic.
CONSUME command returns STREAM_DATA_SIGNAL, triggering streaming modeWhen HandleCommand returns STREAM_DATA_SIGNAL:
HandleConsumeCommand(conn, cmdStr) to stream messagesutil.WriteWithLengthThe writeResponse function implements the same length-prefixed protocol:
respLen := make([]byte, 4)
binary.BigEndian.PutUint32(respLen, uint32(len(resp)))
conn.Write(respLen) // Write 4-byte length
conn.Write([]byte(resp)) // Write response data
This ensures clients can read responses using the same protocol as sending requests.
The health check server runs on a separate HTTP port and provides endpoints for load balancers and orchestration systems.
/health: Primary health check endpoint/: Root path also serves health check (convenience)The health handler in pkg/server/main.go checks the brokerReady atomic boolean.
| Condition | HTTP Status | Response Body |
|---|---|---|
brokerReady.Load() == false |
503 Service Unavailable | "Broker not ready: Main listener not active" |
brokerReady.Load() == true |
200 OK | "OK" |
The brokerReady flag is set to true immediately after the TCP listener starts successfully, ensuring that health checks pass only when the server can accept connections.
The TopicManager is passed to each connection handler and is used for:
tm.Publish(topicName, msg)create, delete, list, subscribeThe DiskManager is passed to CommandHandler and is used for:
CONSUME operations
Managing per-topic-partition DiskHandlersEach connection creates its own CommandHandler instance:
cmdHandler := controller.NewCommandHandler(tm, dm)
And a ClientContext:
ctx := controller.NewClientContext("default-group", 0)
The CommandHandler routes commands to appropriate subsystems. See Command Processing for details.
The Config struct provides tunable parameters:
| Parameter | Default | Purpose |
|---|---|---|
BrokerPort |
9000 | Main TCP server port |
HealthCheckPort |
9080 | HTTP health check port |
UseTLS |
false | Enable TLS encryption |
EnableGzip |
false | Enable message compression |
EnableExporter |
true | Enable Prometheus metrics |
ExporterPort |
9100 | Metrics HTTP port |
When cfg.UseTLS is true:
tlsConfig := &tls.Config{Certificates: []tls.Certificate{cfg.TLSCert}}
ln, err = tls.Listen("tcp", addr, tlsConfig)
The certificate is loaded during configuration initialization from cfg.TLSCertPath and cfg.TLSKeyPath.
When cfg.EnableGzip is true, messages are decompressed after reading:
data, err := DecompressMessage(msgBuf, enableGzip)
The DecompressMessage function handles optional gzip decompression.
The server uses structured logging for different event types:
| Log Prefix | Purpose | Example |
|---|---|---|
[REQ] |
Incoming request | [REQ] [127.0.0.1:54321] Received request. Topic: 'orders', Payload: 'CREATE orders 4' |
[INPUT_WARN] |
Malformed input | [INPUT_WARN] [127.0.0.1:54321] Received unrecognized input |
[CONSUME_ERR] |
Consume streaming error | [CONSUME_ERR] Error streaming data for command [CONSUME orders 0 0] |
[STREAM] |
Streaming completion | [STREAM] Completed streaming 42 messages |
| Error Type | Handling | Code Location |
|---|---|---|
| EOF | Silently close connection (normal client disconnect) | pkg/server/main.go 97–100 |
| Read errors | Log warning and close connection | pkg/server/main.go 98 |
| Decompress errors | Log warning and close connection | pkg/server/main.go 112–114 |
| Accept errors | Log warning and continue accepting | pkg/server/main.go 75–77 |
The Server System provides the network interface for cursus, implementing:
The server acts as the entry point for all client interactions, routing requests to the Topic Management System and Disk Persistence System as needed.