This document provides a comprehensive overview of cursus’s disk persistence system, which is responsible for durably storing messages to disk and retrieving them on demand. The disk persistence layer ensures data durability through batched asynchronous writes while providing efficient read access via memory-mapped I/O.
This page covers the architecture, core components, write and read paths, and the concurrency model of the disk persistence system. For detailed information about:
Segment file management and rotation mechanics, see Segment Management Linux and Windows-specific I/O optimizations, see Platform-Specific Optimizations For information about how disk persistence integrates with topic management, see Topic Management System.
The DiskManager serves as a registry and factory for DiskHandler instances. It maintains a thread-safe map of handlers indexed by a composite key of topic name and partition ID.
| Component | Type | Purpose |
|---|---|---|
| handlers | map[string]*DiskHandler | Registry of active disk handlers |
| mu | sync.Mutex | Protects concurrent access to the handlers map |
| cfg | *config.Config | Configuration for creating new handlers |
Key Methods:
GetHandler(topic string, partitionID int): Returns existing handler or creates a new one for the topic-partition pairCloseAllHandlers(): Gracefully shuts down all active handlersEach DiskHandler manages disk I/O for a single topic-partition pair. It maintains separate write and read paths with distinct concurrency characteristics.
| Field | Type | Purpose |
|---|---|---|
| BaseName | string | Base file path: {LogDir}/{topic}/partition_{id} |
| SegmentSize | int | Maximum segment size (default: 1MB) |
| CurrentOffset | int | Current write position within segment |
| CurrentSegment | int | Current segment number |
| writeCh | chan string | Unbuffered channel for asynchronous writes |
| done | chan struct{} | Shutdown signal channel |
| batchSize | int | Max messages per batch (from DiskFlushBatchSize) |
| linger | time.Duration | Max wait time before flushing partial batch |
| mu | sync.Mutex | Protects metadata (offset, segment number) |
| ioMu | sync.Mutex | Serializes I/O operations (writes, flushes) |
| file | *os.File | Current segment file handle |
| writer | *bufio.Writer | Buffered writer for efficient I/O |
The write path is fully asynchronous to prevent blocking publishers. Messages flow through a channel to a dedicated flush goroutine that performs batching and periodic flushing.
Key Write Functions:
| Function | File | Purpose |
|---|---|---|
AppendMessage(msg string) |
handler.go | Non-blocking enqueue to write channel |
flushLoop() |
flush_common.go | Main batching loop in dedicated goroutine |
writeBatch(batch []string) |
flush_common.go | Write a batch of messages to disk |
rotateSegment() |
flush_common.go | Close current segment and open next |
WriteDirect(msg string) |
flush_common.go | Synchronous write (bypasses channel) |
The read path is synchronous and uses memory-mapped file access for efficient sequential reads. Unlike the write path, reads do not use channels or background goroutines.
Read Path Characteristics:
| Aspect | Implementation |
|---|---|
| Concurrency | Synchronous, no background goroutines |
| Lock Strategy | Acquires mu only to read CurrentSegment, releases immediately |
| I/O Mechanism | Memory-mapped file via golang.org/x/exp/mmap |
| Offset Handling | Skip offset messages, then read max messages |
| Error Handling | Returns partial results on EOF or read errors |
Messages are stored in segment files that rotate at a fixed size boundary (default: 1MB). Each topic-partition has its own sequence of segments numbered sequentially.
{LogDir}/{TopicName}/partition_{PartitionID}_segment_{SegmentNumber}.log
Example:
./logs/orders/partition_0_segment_0.log
./logs/orders/partition_0_segment_1.log
./logs/payments/partition_2_segment_0.log
Rotation Trigger:
When CurrentOffset + messageSize > SegmentSize (1MB by default)
Occurs within writeBatch() before writing a message that would exceed the limit
Messages are stored with a length-prefixed binary format enabling random access and streaming reads.
[4-byte length (big-endian)][message payload bytes][4-byte length][message payload bytes]...
Encoding Details:
| Field | Size | Encoding | Purpose |
|---|---|---|---|
| Length Prefix | 4 bytes | binary.BigEndian.Uint32 | Size of following payload |
| Message Payload | Variable | UTF-8 string bytes | Actual message content |
Example:
Message: “hello” Bytes: [0x00, 0x00, 0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f] |—- length = 5 —| h e l l o
Advantages:
The DiskHandler uses a dual-mutex strategy to allow concurrent reads while serializing writes.
mu (Metadata Lock):
Protects: CurrentOffset, CurrentSegment
Acquired by: writeBatch(), ReadMessages(), WriteDirect(), rotateSegment()
Duration: Short (read/update metadata only)
ioMu (I/O Lock):
Protects: file, writer, actual I/O operations
Acquired by: writeBatch(), WriteDirect(), Flush(), rotateSegment()
Duration: Longer (includes disk I/O)
Never held during reads (mmap access is lock-free)
| Component | Purpose |
|---|---|
| closeOnce | Ensures Close() runs only once |
| done channel | Signals shutdown to flushLoop and AppendMessage |
| shutdown WaitGroup | Blocks Close() until flushLoop exits |
| Drain logic | Processes remaining messages |
The disk persistence system is configured through the config.Config structure:
| Parameter | Field | Default | Purpose |
|---|---|---|---|
| Log Directory | LogDir | ./logs |
Base directory for segment files |
| Batch Size | DiskFlushBatchSize | 500 | Max messages per flush batch |
| Linger Time | LingerMS | 100 | Max milliseconds before flushing partial batch |
| Write Timeout | DiskWriteTimeoutMS | 5000 | Timeout for enqueuing to write channel |
| Channel Buffer | ChannelBufferSize | 10000 | Size of partition message channels |
| Segment Size | Hardcoded | 1048576 | 1MB per segment file |
The disk persistence system provides durable message storage through:
For implementation details, see:
This document provides a detailed explanation of the DiskHandler struct and its asynchronous write path, which forms the foundation of cursus’s disk persistence layer. It covers the batching mechanism, the flushLoop goroutine, and the concurrency control strategies used to achieve high-throughput, durable message storage.
For information about segment rotation and the DiskManager registry, see Segment Management. For Linux-specific optimizations like sendfile and fadvise, see Platform-Specific Optimizations. For the complete disk persistence architecture, see Disk Persistence System.
The DiskHandler is the core component responsible for persisting messages to disk for a single topic-partition pair. Each partition in the system has its own dedicated DiskHandler instance, enabling parallel I/O operations across partitions.
| Field | Type | Purpose |
|---|---|---|
| BaseName | string | Base path for segment files, e.g., ./logs/orders/partition_0 |
| SegmentSize | int | Maximum size per segment file (default: 1MB) |
| CurrentOffset | int | Current write position within the active segment |
| CurrentSegment | int | Active segment number (increments on rotation) |
| writeCh | chan string | Unbuffered channel for asynchronous message enqueue |
| done | chan struct{} | Shutdown signal channel |
| batchSize | int | Maximum messages per batch (default: 500) |
| linger | time.Duration | Maximum wait time before flushing partial batch (default: 100ms) |
| writeTimeout | time.Duration | Timeout for channel enqueue operations |
| mu | sync.Mutex | Protects metadata: CurrentOffset, CurrentSegment, file |
| ioMu | sync.Mutex | Serializes I/O operations: writer.Write(), writer.Flush() |
| file | *os.File | Active segment file handle |
| writer | *bufio.Writer | Buffered writer for efficient disk writes |
The following config.Config fields control DiskHandler behavior:
| Config Field | DiskHandler Field | Default | Purpose |
|---|---|---|---|
| LogDir | BaseName (derived) | ./logs |
Root directory for segment files |
| ChannelBufferSize | writeCh capacity | varies | Size of write channel buffer |
| DiskFlushBatchSize | batchSize | 500 | Messages per batch |
| LingerMS | linger | 100ms | Max wait before flushing partial batch |
| DiskWriteTimeoutMS | writeTimeout | varies | Timeout for channel enqueue |
The write path is designed to be non-blocking for publishers while maximizing disk throughput through batching and buffering.
AppendMessage(msg) on partition’s DiskHandlerwriter.Flush() writes buffered data to filefile.Sync() ensures data reaches physical disk (fsync)When writeTimeout is configured and the channel is full, AppendMessage enters a retry loop with exponential backoff:
This ensures publishers don’t block indefinitely while maintaining message durability guarantees.
The flushLoop goroutine is the heart of the asynchronous write path. It continuously dequeues messages from writeCh, accumulates them into batches, and flushes to disk based on configurable thresholds.
| Condition | Threshold | Behavior |
|---|---|---|
| Batch Full | len(batch) >= batchSize (500) | Immediate flush |
| Linger Timeout | ticker.C fires (100ms) | Flush if len(batch) > 0 |
| Shutdown | done channel closed | Drain and flush remaining messages |
The writeBatch function performs the actual disk I/O, writing a batch of messages with length prefixes to the active segment file.
Each message is written with a 4-byte big-endian length prefix:
| Offset | Length (bytes) | Data (hex) | Message (UTF-8) |
|---|---|---|---|
| 0 | 11 | 68 65 6C 6C 6F 20 77 6F 72 6C 64 | “hello world” |
| 15 | 12 | 61 6E 6F 74 68 65 72 20 6D 73 67 | “another msg” |
The writeBatch function uses a dual-mutex strategy for fine-grained concurrency control:
| Mutex | Protects | Locked By |
|---|---|---|
| mu | CurrentOffset, CurrentSegment, file | writeBatch(), ReadMessages(), rotateSegment() |
| ioMu | writer, writer.Write(), writer.Flush() |
writeBatch(), Flush(), WriteDirect() |
This separation allows concurrent reads while writes are in progress, as readers use memory-mapped I/ O and don’t conflict with the write path.
When CurrentOffset + totalLen > SegmentSize, writeBatch calls rotateSegment() to create a new segment file.
The Close() method provides graceful shutdown with guaranteed message durability.
The shutdown drain loop ensures all pending messages are flushed to disk:
case <-d.done:
draining := true
for draining {
if len(batch) >= d.batchSize {
d.writeBatch(batch)
batch = batch[:0]
continue
}
select {
case msg, ok := <-d.writeCh:
if !ok {
draining = false
continue
}
batch = append(batch, msg)
default:
draining = false
}
}
if len(batch) > 0 {
d.writeBatch(batch)
}
While AppendMessage is the primary write path, DiskHandler also provides alternative methods for specific use cases.
The WriteDirect method bypasses the channel and batching logic for immediate, synchronous disk writes.
| Method | Write Path | Durability | Use Case |
|---|---|---|---|
AppendMessage() |
Asynchronous (channel) | Guaranteed after flush | High-throughput publishing |
WriteDirect() |
Synchronous (direct) | Immediate | Critical messages, low volume |
The batching mechanism provides several performance advantages:
write() syscallfile.Sync() per batch| Scenario | Latency | Notes |
|---|---|---|
| Best Case | ~10ms | Batch full immediately, fast disk |
| Typical | ~50-150ms | Linger timeout (100ms) + fsync |
| Worst Case | >100ms | Linger timeout + slow disk + rotation |
The handler_test.go file provides comprehensive test coverage for the write path.
| Test | Purpose | Key Assertions |
|---|---|---|
| TestDiskHandlerBasic | Basic append and flush | All messages present in segment files |
| TestDiskHandlerChannelOverflow | Channel backpressure | Synchronous fallback works correctly |
| TestDiskHandlerRotation | Segment rotation | Multiple segment files created |
disk_flush_batch_size: 1000 # Larger batches
linger_ms: 200 # Wait longer for full batches
channel_buffer_size: 10000 # Deep queue
disk_write_timeout_ms: 5000 # Longer timeout
disk_flush_batch_size: 100 # Smaller batches
linger_ms: 10 # Quick flushes
channel_buffer_size: 1000 # Shallow queue
disk_write_timeout_ms: 1000 # Quick timeout
disk_flush_batch_size: 50 # Small batches
linger_ms: 50 # Moderate linger
channel_buffer_size: 100 # Minimal queue
disk_write_timeout_ms: 500 # Fast timeout
Note: For complete configuration reference, see Configuration Reference.
The DiskHandler write path provides:
Key Takeaways:
flushLoop goroutine continuously batches and flushes messages