This document explains how messages flow through cursus from publication to consumption. It covers the wire protocol, message routing, deduplication, partition selection, and the dual-path distribution mechanism that ensures both durability and low latency.
For detailed information about specific aspects of message flow:
All communication between clients and the broker uses a TCP-based length-prefixed protocol. Each message on the wire consists of a 4-byte big-endian length prefix followed by the message payload.
The server reads messages using this protocol in HandleConnection:
Responses follow the same format using writeResponse, which writes a 4-byte length prefix followed by the response payload.
cursus implements automatic message deduplication to prevent duplicate processing. The TopicManager maintains a dedupMap (sync.Map) that tracks message IDs for 30 minutes.
The deduplication implementation is in TopicManager.Publish:
msg.ID = util.GenerateID(msg.Payload)dedupMap.LoadOrStore(msg.ID, now)
The cleanup loop runs every 60 seconds (configurable via CleanupInterval) and removes entries older than 30 minutes.
Messages are routed to partitions using one of two strategies, determined by the presence of a message key.
The implementation is in Topic.Publish():
util.GenerateID(msg.Key) to hash the keyidx = keyID % partitionCountcounter % partitionCount where counter is atomically incrementedBoth strategies are protected by a mutex to ensure thread-safety during partition count access.
When a message is enqueued to a partition, it follows two parallel paths simultaneously: the in-memory path for low-latency delivery to active consumers, and the disk path for durability.
flushLoop goroutine batches messages (up to 500 or 100ms timeout)writeBatch writes length-prefixed messages to disk with writer.Flush() and file.Sync()Message consumption follows a different path than publication, reading messages directly from disk rather than from in-memory channels.
The consumption flow is handled by HandleConsumeCommand():
ReadMessages(offset, 8192) to read up to 8192 bytesThe server detects the CONSUME command and returns STREAM_DATA_SIGNAL, which triggers streaming mode.
The disk persistence layer uses batching to optimize I/O operations. The flushLoop goroutine manages batch accumulation and flushing based on size and time thresholds.
The batching logic in flushLoop:
The writeBatch function:
This document describes the complete path a message takes from publication to persistence in cursus. It covers the TCP ingestion protocol, deduplication logic, partition selection strategies, asynchronous disk writes, and in-memory distribution to active consumers.
For information about consuming messages from disk, see Consuming Messages. For details on the disk segment format and read operations, see Segment Management.
The broker accepts TCP connections on port 9000 (configurable via BrokerPort) and processes them using a bounded worker pool pattern with 1000 workers defined by the maxWorkers constant.
Each incoming connection is handled by HandleConnection, which processes messages in a loop until the connection closes or an error occurs. The function uses a 5-minute read deadline to prevent indefinite blocking on idle connections.
All messages use a length-prefixed binary protocol:
| Field | Size | Description |
|---|---|---|
| Length | 4 bytes | Big-endian uint32 indicating message payload size |
| Payload | N bytes | Message data (optionally gzip-compressed) |
The server reads messages in two stages:
io.ReadFullAfter decoding, the server determines whether the payload is a command or a message to publish:
CREATE, DELETE, LIST, SUBSCRIBE, PUBLISH, or CONSUMEFor direct messages, the server calls tm.Publish(topicName, msg) and immediately responds with “OK”. For commands, it delegates to CommandHandler.HandleCommand.
The TopicManager implements message deduplication using a sync.Map that tracks message IDs for 30 minutes. This prevents duplicate processing when clients retry failed operations or experience network issues.
Each message is assigned an ID by hashing its payload using the FNV-1a 64-bit hash function implemented in util.GenerateID.
A background goroutine runs periodically to remove expired deduplication entries:
CleanupInterval)dedupMap and deletes entries older than 30 minutesThe cleanup increments the metrics.CleanupCount counter for observability.
The Topic.Publish method selects a target partition using one of two strategies based on whether the message has a key.
When a message has a non-empty Key field, the broker uses consistent hashing to ensure all messages with the same key go to the same partition:
partitionIndex = hash(key) % partitionCount
The hash function (util.GenerateID) uses FNV-1a 64-bit hashing, ensuring deterministic and uniformly distributed partition assignment.
For messages without a key, the topic uses a simple round-robin counter to distribute load evenly across all partitions:
partitionIndex = counter % partitionCount
counter++
The counter is a uint64 field on the Topic struct, protected by the topic’s mutex during selection.
Messages are persisted to disk asynchronously to avoid blocking publishers. The DiskHandler implements a batching write strategy:
writeCh channelbatchSize (default 500) or flushed after linger time (default 100ms)fsync operationThe flushLoop uses a ticker and channel select to implement the batching logic:
| Trigger | Action |
|---|---|
| Message arrives via writeCh | Add to batch; flush if len(batch) >= batchSize |
| Ticker fires (every linger ms) | Flush batch if non-empty |
| Done signal received | Drain remaining messages and flush |
The writeBatch method writes all messages in a batch atomically:
writer.Flush() and file.Sync() to ensure durabilityThe dual-mutex strategy allows metadata reads to proceed while I/O operations are in progress.
When writing a message would cause the current segment to exceed SegmentSize (1MB), the handler rotates to a new segment:
CurrentSegment counter{topic}_{partition}_{segmentNumber}.logEach partition maintains a buffered channel with capacity 10,000 (configurable via PartitionChannelBufSize).
The Partition.Enqueue method sends messages to this channel immediately after queuing them for disk persistence.
A dedicated goroutine (Partition.run) consumes from this channel and distributes messages to all registered consumer group subscriptions.
When a consumer group is registered, partitions are assigned to consumers using modulo arithmetic:
consumerIndex = partitionID % consumerCount
This ensures each partition’s messages go to exactly one consumer within the group, maintaining ordering guarantees while load-balancing across consumers.
A goroutine bridges each partition’s group subscription channel to the assigned consumer’s MsgCh
The following configuration parameters affect publishing performance:
| Parameter | Config Field | Default | Description |
|---|---|---|---|
| Disk batch size | DiskFlushBatchSize | 500 | Messages per disk flush |
| Linger time | LingerMS | 100ms | Max wait before flush |
| Partition buffer | PartitionChannelBufSize | 10000 | Partition channel capacity |
| Consumer buffer | ConsumerChannelBufSize | 1000 | Consumer channel capacity |
The default values balance these concerns for typical workloads. Adjust based on your specific requirements.
The broker exposes Prometheus metrics for publish operations:
These metrics are incremented in TopicManager.Publish() after successfully enqueueing the message.
This page explains how consumers retrieve messages from cursus, covering both in-memory consumption via consumer groups and on-demand consumption from disk.
For information about how messages are published and stored to disk, see Publishing Messages. For details about consumer group architecture and load balancing, see Consumer Groups.
There are two primary consumption patterns in cursus:
CONSUME command: Clients read historical messages directly from disk at a specific offsetThis page focuses primarily on the on-demand consumption mechanism via the CONSUME command, which allows clients to read persisted messages from disk.
The consumption system consists of several layers working together to retrieve messages from disk and stream them to clients over TCP.
The consumption flow is synchronous and blocking:
CONSUME command, the server immediately begins reading messages from disk and streaming them back over the same TCP connection.The CONSUME command follows a specific syntax:
Syntax: CONSUME <topic> <partition> <offset>
Parameters:
topic: The name of the topic to consume frompartition: The partition ID (integer, 0-indexed)offset: The number of messages to skip before starting consumptionExample: CONSUME orders 0 100
Reads from the “orders” topic, partition 0, skipping the first 100 messages.
When the server receives a command, it distinguishes between regular commands and streaming commands:
The key distinction is the STREAM_DATA_SIGNAL constant which signals to HandleConnection that it should invoke HandleConsumeCommand to stream data directly over the connection.
The HandleConsumeCommand function parses the command arguments:
| Argument Position | Name | Type | Validation |
|---|---|---|---|
| 0 | Command | String | Must be “CONSUME” |
| 1 | Topic | String | Must exist |
| 2 | Partition | Integer | Must be valid integer |
| 3 | Offset | Integer | Must be valid integer |
If parsing fails, an error is returned immediately and the connection is terminated with an error message.
The CommandHandler retrieves the appropriate DiskHandler for the requested topic-partition pair:
dh, err := ch.DiskManager.GetHandler(topicName, partition)
Each topic-partition pair has a dedicated DiskHandler instance that manages its segment files. The DiskManager acts as a registry, creating handlers on-demand if they don’t exist.
Messages are stored in a length-prefixed format:
[4-byte length][message data][4-byte length][message data]...
The ReadMessages method reads this format by:
The system uses memory-mapped file I/O via the golang.org/x/exp/mmap package:
reader, err := mmap.Open(filePath)
if err != nil {
return nil, fmt.Errorf("mmap open failed: %w", err)
}
defer reader.Close()
Memory mapping allows the operating system to manage file I/O efficiently by mapping file contents directly into the process’s address space. This provides:
The ReadAt method performs reads at specific offsets without maintaining cursor state.
The offset parameter specifies how many messages to skip before returning data:
if offset > 0 {
offset--
continue
}
messages = append(messages, msg)
This allows clients to implement pagination or resume consumption from a specific point. The offset is message-based, not byte-based.
Each ReadMessages call reads up to 8192 messages:
messages, err := dh.ReadMessages(offset, 8192)
This limit prevents unbounded memory usage when reading large segments. Clients that need more messages must issue multiple CONSUME commands with adjusted offsets.
After reading messages from disk, HandleConsumeCommand streams them back to the client using length-prefixed framing:
for _, msg := range messages {
msgBytes := []byte(msg.Payload)
if err := util.WriteWithLength(conn, msgBytes); err != nil {
return streamedCount, fmt.Errorf("failed to stream message: %w", err)
}
streamedCount++
}
The util.WriteWithLength helper writes a 4-byte big-endian length prefix followed by the message data, matching the TCP protocol used throughout cursus.
The CONSUME command is terminal for the TCP connection:
After HandleConsumeCommand completes (successfully or with error), the HandleConnection function returns which triggers the deferred conn.Close()
This design ensures that:
CONSUME command gets a fresh connectionIf an error occurs during streaming, the server:
It’s important to distinguish between two consumption mechanisms:
Consumer groups are registered via the SUBSCRIBE command and receive messages in real-time:
This mechanism:
The CONSUME command reads historical messages from disk:
This mechanism:
The two mechanisms are complementary:
CONSUME for batch processing, replay, or catching up after downtimeOn Linux, the DiskHandler provides an optimized method for sending entire segments over TCP using the sendfile() system call:
func (d *DiskHandler) SendCurrentSegmentToConn(conn net.Conn) error
This method bypasses normal read/write operations by transferring data directly from the file descriptor to the socket in kernel space:
The implementation:
unix.Sendfile in a loop until all data is transferredThis optimization provides:
Note: This method is currently not used by the standard CONSUME command flow, which uses ReadMessages with mmap. It is available as an alternative for future optimizations.
When opening segment files for writing, Linux builds apply the FADVISE_SEQUENTIAL hint:
_ = unix.Fadvise(int(f.Fd()), 0, 0, unix.FADV_SEQUENTIAL)
This hints to the kernel that the file will be read sequentially, allowing it to:
Several configuration parameters affect consumption behavior:
| Parameter | Config Field | Default | Description |
|---|---|---|---|
| Read deadline | readDeadline constant | 5 minutes | Maximum idle time before connection timeout |
| Max messages per read | Hard-coded in HandleConsumeCommand | 8192 | Maximum messages returned per CONSUME call |
| Segment size | DiskHandler.SegmentSize | 1 MB | Size of individual segment files |
The read deadline applies to all TCP operations ensuring that inactive connections are eventually closed and resources are freed.
If the requested topic or partition doesn’t exist, DiskManager.GetHandler returns an error:
dh, err := ch.DiskManager.GetHandler(topicName, partition)
if err != nil {
return 0, fmt.Errorf("failed to get disk handler: %w", err)
}
The error is returned to the client as a formatted error message, and the connection is closed.
If the segment file cannot be opened for memory mapping:
reader, err := mmap.Open(filePath)
if err != nil {
return nil, fmt.Errorf("mmap open failed: %w", err)
}
This typically occurs when:
If a write fails while streaming messages to the client:
if err := util.WriteWithLength(conn, msgBytes); err != nil {
return streamedCount, fmt.Errorf("failed to stream message: %w", err)
}
This can happen if:
The error is logged, and the partial count of successfully streamed messages is returned.
The consumption system in cursus provides:
The key classes involved are:
For information about how these messages were originally published and written to disk, see Publishing Messages. For details about real-time consumption via consumer groups, see Consumer Groups.