cursus

Consumer Groups

This document explains how consumer groups work in cursus, including their structure, registration process, load balancing mechanism, and message distribution strategy. Consumer groups enable multiple consumers to share the load of processing messages from a topic while maintaining ordering guarantees within partitions.

For information about topic and partition structure, see Topics and Partitions. For the broader topic management system, see Topic Management System.

Purpose and Functionality

Consumer groups provide a mechanism for horizontal scaling of message consumption. Multiple consumers can join a group to collectively process messages from a topic, with cursus automatically distributing partitions among the consumers.

Each partition’s messages are delivered to exactly one consumer within a group, ensuring that ordering is preserved within each partition while enabling parallel processing across partitions.

Key capabilities:

Data Structures

ConsumerGroup Structure

The ConsumerGroup struct contains an array of Consumer instances. Each Consumer has a buffered channel (MsgCh) with capacity 1000 that receives messages from assigned partitions.

Key Parameters:

Component Buffer Size Purpose
Consumer.MsgCh 1000 Consumer’s message receive buffer
Partition group channel 10000 Per-group buffer in each partition
Partition main channel 10000 Partition’s internal message buffer

Registration Process

RegisterConsumerGroup Method

The RegisterConsumerGroup method establishes a consumer group for a topic. It performs the following operations:

Load Balancing Algorithm

Partition-to-Consumer Distribution

cursus uses a deterministic modulo-based distribution algorithm:

target_consumer_index = partition_id % consumer_count

This ensures:

Distribution Example:

Partition ID	Consumer Count	Target Consumer	Calculation
0	3	0	0 % 3 = 0
0 % 3 = 0
1	3	1	1 % 3 = 1
1 % 3 = 1
2	3	2	2 % 3 = 2
2 % 3 = 2
3	3	0	3 % 3 = 0
3 % 3 = 0
4	3	1	4 % 3 = 1
4 % 3 = 1
5	3	2	5 % 3 = 2
5 % 3 = 2

Message Distribution

From Partition to Consumer

Messages flow through multiple channels before reaching a consumer:

Message Path:

Partition Run Loop

The partition’s run() method distributes messages to all registered consumer groups:

func (p *Partition) run() {
    for msg := range p.ch {
        p.mu.RLock()
        for _, subCh := range p.subs {  // Each group gets a copy
            subCh <- msg
        }
        p.mu.RUnlock()
    }
}

This design enables multiple independent consumer groups to consume the same topic without interference.

Multi-Group Consumption

Independent Consumer Groups

Multiple consumer groups can consume the same topic simultaneously. Each group maintains:

Concurrency and Thread Safety

Locking Strategy

Consumer group registration and access use read-write mutexes:

Operation Lock Type Scope
RegisterConsumerGroup Write lock Topic.mu
Consume channel lookup Read lock Topic.mu
RegisterGroup Write lock Partition.mu
Partition message broadcast Read lock Partition.mu

The locking hierarchy ensures:

Key Characteristics

Behavior Summary

Aspect Behavior
Partition assignment Static, established at registration
Distribution algorithm Modulo: partition_id % consumer_count
Ordering guarantee Per-partition ordering maintained
Group isolation Groups consume independently
Rebalancing Not supported (static assignment)
Consumer failure Channel blocks, may lead to backpressure
Buffer overflow Goroutine blocks if consumer channel full

Limitations

Summary

Consumer groups in cursus provide load balancing and ordering guarantees through a deterministic partition assignment mechanism. The modulo-based distribution ensures even load across consumers while maintaining per-partition message ordering.

Multiple consumer groups can independently consume the same topic, each with its own partition-to-consumer mapping and isolated message delivery channels.

The implementation uses goroutines to bridge partition group channels to consumer message channels, with buffering at multiple levels (partition: 10000, group channel: 10000, consumer: 1000) to handle bursts and provide backpressure protection.