This document provides a complete reference for the cursus command API. It describes the wire protocol format, command syntax, parameters, behavior, and responses for all supported operations. The cursus API is a text-based protocol transmitted over TCP with length-prefixed message framing.
For information about the on-disk storage format, see Disk Format.
All messages exchanged between clients and the broker follow a length-prefixed protocol over TCP.
Each message consists of:
Messages are encoded as:
The util.DecodeMessage() function splits the raw bytes into topic and payload components. Command detection occurs by checking if the payload starts with command keywords: CREATE, DELETE, LIST, SUBSCRIBE, PUBLISH, CONSUME, or HELP.
If gzip compression is enabled via configuration (enable_gzip: true), messages are compressed before length-prefixed framing.
The DecompressMessage function handles decompression transparently.
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | CREATE <topic> [<partitions>] |
| Parameters | topic (string, required) - Topic name to create
partitions (integer, optional, default=4) - Number of partitions |
| Behavior | Adds partitions if topic exists
Each partition has 10,000-message buffer and dedicated DiskHandler
Partition count must be positive |
| Response | ✅ Topic '<topic>' now has <N> partitions |
| Errors | ERROR: missing topic nameERROR: partitions must be a positive integer |
| Example | CREATE orders 8> ✅ Topic 'orders' now has 8 partitions |
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | DELETE <topic> |
| Parameters | topic (string, required) - Topic name to delete |
| Behavior | Removes topic from TopicManager.topics
Stops partition goroutines
Closes consumer channels
Disk segment files not automatically deleted |
| Response | 🗑️ Topic '<topic>' deleted |
| Errors | ERROR: topic '<topic>' not found |
| Example | DELETE orders> 🗑️ Topic 'orders' deleted |
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | LIST |
| Parameters | None |
| Behavior | Returns comma-separated list of all topic names |
| Response | <topic1>, <topic2>, <topic3>If none exist: (no topics) |
| Example | LIST> orders, payments, notifications |
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | SUBSCRIBE <topic> |
| Parameters | topic (string, required) - Topic name to subscribe to |
| Behavior | Registers client in topic
Creates consumer with 1,000-message buffer
Partitions distribute messages using modulo arithmetic
Tracks subscribed topics in ClientContext.CurrentTopics |
| Response | ✅ Subscribed to '<topic>' |
| Errors | ERROR: topic '<topic>' does not exist |
| Example | SUBSCRIBE orders> ✅ Subscribed to 'orders' |
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | PUBLISH <topic> <message> |
| Parameters | topic (string, required)
message (string, required) - payload |
| Behavior | Assigns unique ID via util.GenerateID
Deduplication check (30 min)
Routes to partition (key-based or round-robin)
Writes to disk asynchronously
Distributes to consumers |
| Response | 📤 Published to '<topic>' |
| Errors | ERROR: invalid PUBLISH syntaxERROR: topic '<topic>' does not exist |
| Example | PUBLISH orders {"order_id":123,"amount":99.99}> 📤 Published to 'orders' |
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | CONSUME <topic> <partition> <offset> |
| Parameters | topic (string, required)
partition (integer, required, 0-indexed)
offset (integer, required, byte offset in segment) |
| Behavior | Detected by HandleCommand → STREAM_DATA_SIGNAL
Handled by HandleConsumeCommand
Reads messages from DiskHandler (up to 8192 bytes)
Streams back with util.WriteWithLength |
| Response | Each message as [4-byte length][message payload]
Streamed count logged: [STREAM] Completed streaming N messages |
| Errors | ERROR: invalid CONSUME syntaxERROR: invalid partition IDERROR: invalid offsetERROR: failed to get disk handlerERROR: failed to read messages from disk |
| Example | CONSUME orders 0 0> [4 bytes: 45][message payload 1][4 bytes: 52][message payload 2] |
| Aspect | Description |
|————–|—————————————————————————–|
| Syntax | HELP |
| Parameters | None |
| Response | Lists all available commands with syntax |
| Example | HELP> Available commands:CREATE <topic> [<partitions>]DELETE <topic>LISTSUBSCRIBE <topic>PUBLISH <topic> <message>CONSUME <topic> <pID> <offset>HELPEXIT |
[4-byte length][response string]
Streaming Commands (CONSUME)
[4-byte length 1][message 1][4-byte length 2][message 2]...
All errors are returned as strings prefixed with ERROR:
ERROR: <error description>
| Error | Cause | Example |
|---|---|---|
ERROR: empty command |
Empty command string received | - |
ERROR: unknown command |
Command not in supported list | ERROR: unknown command: FOO |
ERROR: missing topic name |
CREATE without topic name | CREATE |
ERROR: topic '<topic>' not found |
Operation on non-existent topic | DELETE nonexistent |
ERROR: topic '<topic>' does not exist |
SUBSCRIBE/PUBLISH to missing topic | SUBSCRIBE nonexistent |
ERROR: invalid PUBLISH syntax |
PUBLISH without message | PUBLISH orders |
ERROR: invalid CONSUME syntax |
CONSUME with wrong argument count | CONSUME orders |
The logCommandResult function logs all command outcomes:
[CMD] SUCCESS for successful commands
[CMD] FAILURE for error responses
Each connection maintains a ClientContext that tracks:
| Field | Type | Description |
|---|---|---|
| ConsumerGroup | string | Consumer group identifier (e.g., “tcp-group”) |
| ConsumerID | int | Consumer identifier within group |
| CurrentTopics | map[string]struct{} | Set of subscribed topic names |
| Function | Purpose |
|---|---|
| HandleConnection | Main connection handler loop |
| HandleCommand | Command parser and dispatcher |
| HandleConsumeCommand | CONSUME streaming handler |
| isCommand | Command keyword detection |
| writeResponse | Response encoding |
| logCommandResult | Command logging |
Commands respect these configuration parameters:
| Parameter | Config Field | Default | Impact |
|---|---|---|---|
| Partition buffer size | partition_channel_buffer_size |
10000 | CREATE command partition capacity |
| Consumer buffer size | consumer_channel_buffer_size |
1000 | SUBSCRIBE consumer channel capacity |
| Read batch size | N/A | 8192 bytes | CONSUME read size |