cursus

Server System

Purpose and Scope

This document describes the Server System in cursus, which is responsible for accepting TCP connections, managing concurrent client sessions, and routing commands to the appropriate subsystems.

The server implements a bounded worker pool pattern to handle connections efficiently and exposes multiple network ports for different purposes.

Server Architecture Overview

The server system consists of three primary components:

Component Port Purpose Protocol
Main Broker Server 9000 (default) Message publishing, topic management, consumption TCP with length-prefixed framing
Health Check Server 9080 (default) Readiness/liveness probes HTTP
Metrics Exporter 9100 (default) Prometheus metrics HTTP

The main broker server is implemented in RunServer and uses a fixed-size worker pool to handle connections concurrently. Each connection is processed by HandleConnection, which handles the message protocol, command detection, and routing.

Server Components

Component Description
Metrics Server If enabled, starts Prometheus exporter on cfg.ExporterPort.
TCP Listener Creates TLS or plain TCP listener on cfg.BrokerPort.
Readiness Flag Sets brokerReady to true to signal the health endpoint.
Health Check Server Starts HTTP server for /health endpoint.
Worker Pool Creates worker channel and spawns goroutines.
Accept Loop Accepts connections and dispatches them to workers.

Server Initialization Flow

Key Constants and Variables:

Worker Pool Pattern

The server uses a bounded worker pool pattern to limit concurrent connections and prevent resource exhaustion.

Implementation Details:

Benefits:

Connection Handling Protocol

The HandleConnection function processes a single client connection through multiple request-response cycles.

Message Protocol Format:

Each message uses length-prefixed framing:

[4-byte length (big-endian)][message data]

Message Reading and Parsing

Length-Prefixed Protocol

The server uses a 4-byte big-endian length prefix for all messages:

Message vs Command Detection

The isCommand function checks whether the incoming payload begins with a supported command keyword.

Command Purpose
CREATE Create a new topic
DELETE Delete a topic
LIST List all topics
SUBSCRIBE Subscribe to a topic
PUBLISH Publish a message (explicit)
CONSUME Consume messages from disk
HELP Show help message

If the payload does not start with a recognized command, it is treated as a regular message to be published to the target topic.

Command Routing and Response Handling

Response Types

Streaming Mode

When HandleCommand returns STREAM_DATA_SIGNAL:

Response Writing

The writeResponse function implements the same length-prefixed protocol:

respLen := make([]byte, 4)
binary.BigEndian.PutUint32(respLen, uint32(len(resp)))
conn.Write(respLen)      // Write 4-byte length
conn.Write([]byte(resp)) // Write response data

This ensures clients can read responses using the same protocol as sending requests.

Health Check System

The health check server runs on a separate HTTP port and provides endpoints for load balancers and orchestration systems.

Endpoints

Health Check Logic

The health handler in pkg/server/main.go checks the brokerReady atomic boolean.

Condition HTTP Status Response Body
brokerReady.Load() == false 503 Service Unavailable "Broker not ready: Main listener not active"
brokerReady.Load() == true 200 OK "OK"

The brokerReady flag is set to true immediately after the TCP listener starts successfully, ensuring that health checks pass only when the server can accept connections.

Integration with Other Systems

TopicManager Integration

The TopicManager is passed to each connection handler and is used for:

DiskManager Integration

The DiskManager is passed to CommandHandler and is used for:

CommandHandler Integration

Each connection creates its own CommandHandler instance:

cmdHandler := controller.NewCommandHandler(tm, dm)

And a ClientContext:

ctx := controller.NewClientContext("default-group", 0)

The CommandHandler routes commands to appropriate subsystems. See Command Processing for details.

Configuration Integration

The Config struct provides tunable parameters:

Parameter Default Purpose
BrokerPort 9000 Main TCP server port
HealthCheckPort 9080 HTTP health check port
UseTLS false Enable TLS encryption
EnableGzip false Enable message compression
EnableExporter true Enable Prometheus metrics
ExporterPort 9100 Metrics HTTP port

TLS and Compression Support

TLS Configuration

When cfg.UseTLS is true:

tlsConfig := &tls.Config{Certificates: []tls.Certificate{cfg.TLSCert}}
ln, err = tls.Listen("tcp", addr, tlsConfig)

The certificate is loaded during configuration initialization from cfg.TLSCertPath and cfg.TLSKeyPath.

Gzip Compression

When cfg.EnableGzip is true, messages are decompressed after reading:

data, err := DecompressMessage(msgBuf, enableGzip)

The DecompressMessage function handles optional gzip decompression.

Error Handling and Logging

The server uses structured logging for different event types:

Structured Logging

Log Prefix Purpose Example
[REQ] Incoming request [REQ] [127.0.0.1:54321] Received request. Topic: 'orders', Payload: 'CREATE orders 4'
[INPUT_WARN] Malformed input [INPUT_WARN] [127.0.0.1:54321] Received unrecognized input
[CONSUME_ERR] Consume streaming error [CONSUME_ERR] Error streaming data for command [CONSUME orders 0 0]
[STREAM] Streaming completion [STREAM] Completed streaming 42 messages

Connection Error Handling

Error Type Handling Code Location
EOF Silently close connection (normal client disconnect) pkg/server/main.go 97–100
Read errors Log warning and close connection pkg/server/main.go 98
Decompress errors Log warning and close connection pkg/server/main.go 112–114
Accept errors Log warning and continue accepting pkg/server/main.go 75–77

Summary

The Server System provides the network interface for cursus, implementing:

The server acts as the entry point for all client interactions, routing requests to the Topic Management System and Disk Persistence System as needed.