This document explains how log segments are created, rotated, and managed in the cursus disk persistence system.
A segment is a physical file on disk that stores messages for a specific topic-partition pair. Segments are rotated (rolled over to a new file) when they reach 1MB in size, enabling efficient parallel I/O, simplified cleanup operations, and bounded resource usage.
For information about the on-disk message format within segments, see Disk Format. For platform-specific I/O optimizations, see Platform-Specific Optimizations.
Each topic-partition pair has its own directory and sequence of segment files. The directory structure follows this pattern:
{LogDir}/{topicName}/partition_{partitionID}/
├── {topicName}_partition_{partitionID}_segment_0.log
├── {topicName}_partition_{partitionID}_segment_1.log
├── {topicName}_partition_{partitionID}_segment_2.log
└── ...
The BaseName for a DiskHandler is constructed as: {LogDir}/{topicName}/partition_{partitionID}, and individual segment files append _segment_{N}.log to this base.
Each segment has a maximum size of 1MB (1,048,576 bytes).
Parameter Value Location segmentSize 1024 * 1024 bytes (1MB) // todo
When writing a message would cause the current segment to exceed this size, rotation is triggered.
When a DiskHandler is first created, it initializes with segment 0. The process occurs in NewDiskHandler():
{BaseName}_segment_0.log with flagsCurrentSegment = 0 and CurrentOffset = 0The openSegment() function creates or opens a segment file and is platform-specific. On Windows, it uses standard file operations:
flags := os.O_CREATE | os.O_RDWR | os.O_APPEND
f, err := os.OpenFile(
fmt.Sprintf("%s_segment_%d.log", d.BaseName, d.CurrentSegment),
flags,
0644
)
Segment rotation occurs in the writeBatch() function when writing a message would exceed the SegmentSize:
totalLen := 4 + len(data) // 4-byte length prefix + message data
if d.CurrentOffset+totalLen > d.SegmentSize {
if err := d.rotateSegment(); err != nil {
log.Printf("ERROR: rotateSegment failed: %v", err)
break
}
}
The condition d.CurrentOffset + totalLen > d.SegmentSize ensures that each segment never exceeds exactly 1MB. The rotation happens before writing the message that would exceed the limit.
rotateSegment() ProcessThe rotateSegment() function orchestrates the transition from one segment file to the next:
The rotation process is synchronized with both mu (metadata lock) and ioMu (I/O lock) held by the calling writeBatch() function, ensuring thread safety.
| Step | Action | Error Handling |
|---|---|---|
| 1 | Flush bufio.Writer | Error logged, collected |
| 2 | Close current file | Error logged, collected |
| 3 | Increment CurrentSegment | N/A |
| 4 | Reset CurrentOffset to 0 | N/A |
| 5 | Call openSegment() |
Error collected |
| 6 | Return aggregated errors | Multiple errors returned as slice |
The DiskManager maintains a central registry of all DiskHandler instances in the system.
Each topic-partition pair gets exactly one DiskHandler instance, preventing duplicate file handles and ensuring serialized writes.
Handlers are keyed in the registry using the format: “{topic}_{partitionID}”.
This simple string key allows O(1) lookup:
key := fmt.Sprintf("%s_%d", topic, partitionID)
if dh, ok := dm.handlers[key]; ok {
return dh, nil
}
The GetHandler() method implements lazy initialization - handlers are created on first access:
| Step | Action | Synchronization |
|---|---|---|
| 1 | Acquire DiskManager.mu lock |
sync.Mutex |
| 2 | Check if handler exists in map | Map lookup |
| 3 | If exists, return existing handler | Early return |
| 4 | If not exists, create log directory | os.MkdirAll() |
| 5 | Create new DiskHandler (1MB segments) | NewDiskHandler() |
| 6 | Store in handlers map | Map insertion |
| 7 | Return new handler | Release lock |
The CloseAllHandlers() method cleanly shuts down all handlers during broker shutdown:
func (dm *DiskManager) CloseAllHandlers() {
dm.mu.Lock()
defer dm.mu.Unlock()
for name, dh := range dm.handlers {
fmt.Printf("Closing DiskHandler for %s\n", name)
dh.Close()
delete(dm.handlers, name)
}
}
This ensures:
| State | Description | File Operations | Thread Safety | ||
|---|---|---|---|---|---|
| Creating | Segment file is being opened | os.OpenFile() with O_CREATE |
O_RDWR | O_APPEND | Protected by ioMu |
| Active | Current segment accepting writes | bufio.Writer.Write(), periodic Flush() and Sync() |
Protected by both mu and ioMu | ||
| Rotating | Transitioning to next segment | Flush(), Close(), increment counters, openSegment() |
Protected by both locks | ||
| Closed | Previous segment, no longer written | Read operations via mmap.Open() |
Only metadata protected by mu |
The DiskHandler uses two separate mutexes to allow concurrent reads and writes:
| Operation | Locks Acquired | Purpose |
|---|---|---|
writeBatch() |
mu then ioMu | Update offset and perform I/O |
rotateSegment() |
mu and ioMu (inherited) | Update metadata and replace file |
ReadMessages() |
mu only | Read current segment number |
AppendMessage() |
None (channel-based) | Non-blocking enqueue |
This dual-lock design allows:
| Operation | Triggers Segment Action | Segment State Change |
|---|---|---|
writeBatch() |
Checks if rotation needed before each message | Active → Rotating → Active (new segment) |
rotateSegment() |
Closes current, opens next | Active → Closed (old), Creating → Active (new) |
WriteDirect() |
Checks if rotation needed | Same as writeBatch() |
flushLoop() |
Calls writeBatch() periodically |
Indirect via writeBatch() |
| Operation | Segment Access | Locking |
|---|---|---|
ReadMessages() |
Opens current segment via mmap.Open() |
Acquires mu to read CurrentSegment |
SendCurrentSegmentToConn() |
Transfers current segment to network connection | Acquires both locks, uses io.Copy() |
rotateSegment() ensures clean transition with proper flush and close operationsGetHandler()