This document explains how consumer groups work in cursus, including their structure, registration process, load balancing mechanism, and message distribution strategy. Consumer groups enable multiple consumers to share the load of processing messages from a topic while maintaining ordering guarantees within partitions.
For information about topic and partition structure, see Topics and Partitions. For the broader topic management system, see Topic Management System.
Consumer groups provide a mechanism for horizontal scaling of message consumption. Multiple consumers can join a group to collectively process messages from a topic, with cursus automatically distributing partitions among the consumers.
Each partition’s messages are delivered to exactly one consumer within a group, ensuring that ordering is preserved within each partition while enabling parallel processing across partitions.
The ConsumerGroup struct contains an array of Consumer instances. Each Consumer has a buffered channel (MsgCh) with capacity 1000 that receives messages from assigned partitions.
| Component | Buffer Size | Purpose |
|---|---|---|
| Consumer.MsgCh | 1000 | Consumer’s message receive buffer |
| Partition group channel | 10000 | Per-group buffer in each partition |
| Partition main channel | 10000 | Partition’s internal message buffer |
The RegisterConsumerGroup method establishes a consumer group for a topic. It performs the following operations:
cursus uses a deterministic modulo-based distribution algorithm:
target_consumer_index = partition_id % consumer_count
This ensures:
Partition ID Consumer Count Target Consumer Calculation
0 3 0 0 % 3 = 0
0 % 3 = 0
1 3 1 1 % 3 = 1
1 % 3 = 1
2 3 2 2 % 3 = 2
2 % 3 = 2
3 3 0 3 % 3 = 0
3 % 3 = 0
4 3 1 4 % 3 = 1
4 % 3 = 1
5 3 2 5 % 3 = 2
5 % 3 = 2
Messages flow through multiple channels before reaching a consumer:
Message Path:
The partition’s run() method distributes messages to all registered consumer groups:
func (p *Partition) run() {
for msg := range p.ch {
p.mu.RLock()
for _, subCh := range p.subs { // Each group gets a copy
subCh <- msg
}
p.mu.RUnlock()
}
}
This design enables multiple independent consumer groups to consume the same topic without interference.
Multiple consumer groups can consume the same topic simultaneously. Each group maintains:
Consumer group registration and access use read-write mutexes:
| Operation | Lock Type | Scope |
|---|---|---|
| RegisterConsumerGroup | Write lock | Topic.mu |
| Consume channel lookup | Read lock | Topic.mu |
| RegisterGroup | Write lock | Partition.mu |
| Partition message broadcast | Read lock | Partition.mu |
The locking hierarchy ensures:
| Aspect | Behavior |
|---|---|
| Partition assignment | Static, established at registration |
| Distribution algorithm | Modulo: partition_id % consumer_count |
| Ordering guarantee | Per-partition ordering maintained |
| Group isolation | Groups consume independently |
| Rebalancing | Not supported (static assignment) |
| Consumer failure | Channel blocks, may lead to backpressure |
| Buffer overflow | Goroutine blocks if consumer channel full |
Consumer groups in cursus provide load balancing and ordering guarantees through a deterministic partition assignment mechanism. The modulo-based distribution ensures even load across consumers while maintaining per-partition message ordering.
Multiple consumer groups can independently consume the same topic, each with its own partition-to-consumer mapping and isolated message delivery channels.
The implementation uses goroutines to bridge partition group channels to consumer message channels, with buffering at multiple levels (partition: 10000, group channel: 10000, consumer: 1000) to handle bursts and provide backpressure protection.