The Topic Management System is the core routing and distribution layer of cursus. It orchestrates how messages flow from publishers to consumers through topics, partitions, and consumer groups.
This system handles partition selection, message deduplication, consumer group registration, and in-memory message distribution to active consumers.
For detailed information about partition selection strategies and ordering guarantees, see Topics and Partitions. For consumer group mechanics and load balancing, see Consumer Groups. For disk persistence of messages, see Disk Persistence System.
The TopicManager serves as the central registry and coordinator for all topics in the system. It maintains the global deduplication map and periodically cleans expired entries.
| Field | Type | Purpose |
|---|---|---|
| topics | map[string]*Topic | Registry of all topics by name |
| dedupMap | sync.Map | Global message deduplication (30-minute window) |
| cleanupInt | time.Duration | Interval for cleanup loop (default: 60s) |
| stopCh | chan struct{} | Signal channel to stop cleanup goroutine |
| hp | HandlerProvider | Provides disk handlers for partitions |
| mu | sync.RWMutex | Protects topics map |
The TopicManager is initialized with a HandlerProvider interface that supplies DiskHandler instances for each topic-partition combination.
Key operations:
A Topic represents a logical message stream divided into multiple partitions. It handles partition selection during message publication and maintains consumer group registrations.
| Field | Type | Purpose |
|---|---|---|
| Name | string | Topic identifier |
| Partitions | []*Partition | Array of partition instances |
| counter | uint64 | Round-robin counter for partition selection |
| consumerGroups | map[string]*ConsumerGroup | Registered consumer groups by name |
| mu | sync.RWMutex | Protects counter and consumer groups |
The Topic.Publish() method implements two partition selection strategies
msg.Key != "", uses util.GenerateID(msg.Key)counter % partitionCount and increments counterEach Partition represents an independent message queue within a topic. It runs a dedicated goroutine that distributes incoming messages to registered consumer groups.
| Field | Type | Purpose |
|---|---|---|
| id | int | Partition identifier (0-based index) |
| topic | string | Parent topic name |
| ch | chan types.Message | Message queue (capacity: 10,000) |
| subs | map[string]chan types.Message | Per-group distribution channels |
| mu | sync.RWMutex | Protects subs map |
| dh | interface{} | DiskHandler for persistence |
| closed | bool | Partition state flag |
The partition’s run() goroutine continuously reads from ch and distributes to all registered group channels
for msg := range p.ch {
for _, subCh := range p.subs {
subCh <- msg // Fan-out to all consumer groups
}
}
A ConsumerGroup contains multiple Consumer instances that collectively consume messages from a topic. Each partition is assigned to exactly one consumer within a group using modulo arithmetic.
ConsumerGroup Structure:
| Field | Type | Purpose |
|---|---|---|
| Name | string | Group identifier |
| Consumers | []*Consumer | Slice of Consumer pointers |
Consumer Structure:
| Field | Type | Purpose |
|---|---|---|
| ID | int | Consumer index within the group (0-based) |
| MsgCh | chan types.Message | Buffered channel (capacity: 1,000) for receiving messages |
Partition-to-consumer assignment uses: consumerID = partitionID % consumerCount
The TopicManager implements a global deduplication system using a sync.Map to track message IDs. This prevents duplicate processing when messages are retransmitted due to network issues or client retries.
util.GenerateID()dedupMap.LoadOrStore(msg.ID, time.Now()) atomically checks and storesThe cleanup loop runs at an interval specified by config.CleanupInterval (default: 60 seconds)
expireBefore := time.Now().Add(-30 * time.Minute)
dedupMap.Range(func(key, value any) bool {
if ts, ok := value.(time.Time); ok && ts.Before(expireBefore) {
dedupMap.Delete(key)
metrics.CleanupCount.Inc()
}
return true
})
Topics are created via TopicManager.CreateTopic(name, partitionCount)
The method handles three scenarios:
| Scenario | Action | Result |
|---|---|---|
| New topic | Create with N partitions | New Topic with partition array |
| Existing topic, same count | No-op | Return existing topic |
| Existing topic, increase count | Add partitions | Expanded partition array |
| Existing topic, decrease count | Reject | Warning printed, no change |
The DeleteTopic(name) method removes a topic from the registry
Note that this only removes the topic from memory; it does not clean up disk files or close partition goroutines.
Consumer groups are registered per-topic using Topic.RegisterConsumerGroup(groupName, consumerCount)
The registration process:
The system uses deterministic modulo arithmetic to assign partitions to consumers:
for pid, p := range t.Partitions {
groupCh := p.RegisterGroup(groupName)
target := pid % consumerCount // Deterministic assignment
go func(ch <-chan types.Message, consumer *Consumer) {
for msg := range ch {
consumer.MsgCh <- msg
}
}(groupCh, group.Consumers[target])
}
This ensures:
The system uses buffered channels extensively to prevent blocking during message distribution. Channel capacities are hard-coded throughout the topic management layer:
| Channel | Capacity | Purpose |
|---|---|---|
| Partition channel | 10,000 | Partition message queue |
| Group distribution channel | 10,000 | Per-group message buffer in partition |
| Consumer message channel | 1,000 | Consumer’s message receive buffer |
These capacities provide backpressure tolerance but can fill up if consumers fall behind. Once full, the sending goroutine will block until space is available.
The Topic Management System integrates with:
All components in the topic management system are designed for concurrent access:
The run() goroutine in each partition holds a read lock only while iterating over subscriber channels, allowing concurrent message enqueuing.