This document describes the on-disk storage format used by cursus to persist messages. It covers the segment file structure, message encoding scheme, write and read mechanics, and platform-specific optimizations.
For details on the asynchronous write path and batching behavior, see DiskHandler and Write Path.
cursus stores messages in append-only log files organized into segments. Each topic-partition pair maintains its own set of segment files, allowing for parallel I/O operations and independent management. This document specifies:
Messages are organized in a hierarchical directory structure based on topic and partition:
{LogDir}/
{topicName}/
partition_{partitionID}_segment_0.log
partition_{partitionID}_segment_1.log
partition_{partitionID}_segment_N.log
Example:
./logs/
orders/
partition_0_segment_0.log
partition_0_segment_1.log
partition_0_segment_2.log
users/
partition_0_segment_0.log
partition_1_segment_0.log
| Component | Format | Example |
|---|---|---|
| Base path | {LogDir}/{topicName}/partition_{partitionID} |
./logs/orders/partition_0 |
| Segment file | {base}_segment_{segmentNumber}.log |
partition_0_segment_0.log |
The base path is constructed using platform-specific path separators (os.PathSeparator) to ensure cross-platform compatibility.
Each segment file has a maximum size of 1 MB (1,048,576 bytes).
When the current segment would exceed this limit after writing a message, the DiskHandler automatically rotates to a new segment.
The DiskHandler tracks segment state using the following fields:
| Field | Type | Purpose |
|---|---|---|
| CurrentSegment | int | Active segment number (0, 1, 2, …) |
| CurrentOffset | int | Write position within current segment (bytes) |
| SegmentSize | int | Maximum segment size (1 MB) |
| BaseName | string | Base path for segment files |
Messages are stored using a simple length-prefixed binary format. Each message consists of:
┌─────────────┬──────────────────┬─────────────┬──────────────────┐
│ Length (4B) │ Message Data (N) │ Length (4B) │ Message Data (M) │ ...
└─────────────┴──────────────────┴─────────────┴──────────────────┘
Example:
Message 1: "Hello"
[0x00, 0x00, 0x00, 0x05] [0x48, 0x65, 0x6C, 0x6C, 0x6F]
^--- Length = 5 ^--- "Hello" in ASCII
Message 2: "World!"
[0x00, 0x00, 0x00, 0x06] [0x57, 0x6F, 0x72, 0x6C, 0x64, 0x21]
^--- Length = 6 ^--- "World!" in ASCII
The encoding process follows these steps:
binary.BigEndian.PutUint32()bufio.Writerbufio.WriterThe length prefix uses big-endian (network byte order) encoding.
This ensures consistent interpretation across different hardware architectures and allows for straightforward binary inspection.
The write path employs a multi-level buffering strategy to optimize disk I/O:
| Stage | Component | Configuration | Behavior |
|---|---|---|---|
| Enqueue | writeCh channel | ChannelBufferSize | Non-blocking with timeout retry |
| Batch | flushLoop goroutine | DiskFlushBatchSize (500) | Accumulates messages up to batch size |
| Linger | time.Ticker |
LingerMS (100ms) | Flushes partial batches after timeout |
| Buffer | bufio.Writer |
Default (4KB) | In-memory buffering before syscall |
| Sync | file.Sync() |
After each batch | Forces kernel flush to disk |
The flushLoop goroutine flushes accumulated messages under three conditions:
After writing each batch, the system ensures durability through:
bufio.Writer.Flush() - Writes buffered data to file descriptorfile.Sync() - Invokes fsync() syscall to flush kernel page cacheThis guarantees that messages are physically written to disk before acknowledging completion.
cursus uses memory-mapped I/O for reading messages, leveraging the golang.org/x/exp/mmap package.
This approach provides efficient random access to segment files without requiring explicit read syscalls for each message.
The ReadMessages function implements logical offset handling:
pos = 0)offset > 0, skip the message and decrement offsetoffset == 0, begin collecting messagesThis approach trades some read efficiency for simplicity, as it doesn’t maintain an index structure. For workloads with small offsets relative to segment size, the cost is minimal due to memory-mapped I/O.
The DiskHandler uses two mutexes to allow concurrent reads while protecting critical sections:
| Mutex | Protects | Acquired By | Purpose |
|---|---|---|---|
| mu | CurrentSegment, CurrentOffset | Read/Write operations | Metadata consistency |
| ioMu | file, writer | Write operations only | I/O serialization |
Write operations acquire locks in this order to prevent deadlocks:
Read operations only acquire mu briefly to read CurrentSegment, then release it before opening the mmap reader.
On Linux builds, the segment opening includes platform-specific optimizations:
From flush_linux.go (not shown in provided files)
O_DIRECT or O_SYNC flags for write-throughsendfile(2) for zero-copy transfersThe Windows implementation (flush_window.go) uses standard file operations without advanced optimizations:
os.O_CREATE, os.O_RDWR, os.O_APPEND)sendfile() equivalent (uses io.Copy() instead)fadvise() hints available| Aspect | Specification |
|---|---|
| Segment size | 1 MB (1,048,576 bytes) |
| File naming | {topic}/partition_{id}_segment_{n}.log |
| Message format | 4-byte big-endian length + payload |
| Write buffering | bufio.Writer + batch accumulation |
| Batch size | Configurable (default: 500 messages) |
| Linger time | Configurable (default: 100ms) |
| Read mechanism | Memory-mapped I/O (mmap) |
| Durability | fsync() after each batch |
| Concurrency | Dual mutex (mu + ioMu) |
| Segment rotation | Automatic at 1 MB boundary |