Building Scalable Event-Driven Systems with Apache Kafka: Comparing Message Stream vs. Message Queue Approaches

Luiz Oliveira
8 min readJan 15, 2025

--

At its core, Apache Kafka is a distributed streaming platform designed for high throughput, fault tolerance, and horizontal scalability. In my previous post (Why and When Use Apache Kafka for Event-Driven Systems), I share introductory information about Apache Kafka that is important for better understanding this article.

In an event-driven systems architecture, Kafka plays a central role in decoupling services with asynchronous messaging. We compare two approaches:

  • Message Stream Approach (Fan-Out):
    Every consumer gets a copy of each message. This pattern is perfect when you need real-time updates to multiple independent systems — for example, sending notifications, analytics processing, and logging simultaneously.
Message Stream Approach (Fan-Out)
  • Message Queue Approach (Work Distribution):
    A message is delivered to one consumer out of a consumer group, balancing the workload across multiple consumers. This pattern is ideal for distributed task processing where a job should be performed once — like processing orders or handling background tasks.
Message Queue Approach (Work Distribution)

Alongside these patterns, we use Docker Compose for containerization, Kafka for messaging, and Python for implementation, providing a self-contained environment for experimentation and development.

The Project Setup

Docker Compose for Kafka and Zookeeper

We use Docker Compose to run Kafka and Zookeeper in containers. Zookeeper coordinates Kafka brokers, and Kafka acts as our messaging server. The docker-compose YAML configuration sets essential parameters such as the number of partitions (crucial for balancing workloads) and auto-topic creation.

version: '3.7'
services:
zookeeper:
image: bitnami/zookeeper:latest
container_name: zookeeper
environment:
- ZOO_ENABLE_AUTH=no
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- "2181:2181"

kafka:
image: bitnami/kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true

Create a file named docker-compose.yml with the above-mentioned YAML code. After creating, run the “docker-compose up” command in the same directory. This way, you will run such microservices as containers in your infrastructure without the effort to install and configure them.

For simplicity, we will run the Python scripts on the host itself, assuming that in a real-life project, these scripts may be distributed across different hosts depending on the project’s characteristics. We chose Python for this tutorial, but you can implement your producers and consumers in any language supported by Apache Kafka APIs. At the end of the article, will be shared links to other programming languages.

Python Producers and Consumers

Message Stream Producer & Consumers

For the stream approach, a producer sends messages to a topic where each consumer uses a unique consumer group (each consumer has a different group id). As a result, all consumers receive each message independently. This is useful when different services need to react to the same event — say, logging an event while triggering a notification.

Message Stream Approach (Fan-Out)

A single message (red ball) is replicated and delivered to each consumer. This happens because each consumer has a different group ID from the other (group-1, group-2, …, group-N).

Producer Message Stream

from kafka import KafkaProducer
import time

producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8')
)

topic_name = 'stream_topic'
for i in range(5):
producer.send(topic_name, value=f"Stream Message #{i}")
time.sleep(1)
producer.flush()

Consumer Message Stream

from kafka import KafkaConsumer

consumer = KafkaConsumer(
'stream_topic',
bootstrap_servers='localhost:9092',
group_id='group-1', # Unique group per consumer
auto_offset_reset='earliest'
)

for message in consumer:
print(f"Received by Consumer A: {message.value.decode('utf-8')}")

Message Queue Producer & Consumers

For the queue approach, multiple consumers share the same consumer group id. Kafka’s partitioning mechanism assigns partitions to each consumer, ensuring that only one consumer processes each message in the group.

Message Queue Approach (Work Distribution)

In this paradigm, each message (denoted by different colors) is delivered to only one consumer in each group. Since all consumers share the same group ID (“shared-group”), the behavior of a work distribution is achieved.

Producer for queue pattern (omitting the key to allow round-robin):

from kafka import KafkaProducer
import time

producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8')
)

topic_name = 'queue_topic'
for i in range(10):
producer.send(topic_name, value=f"Queue Message #{i}")
time.sleep(1)
producer.flush()

Consumers in a shared group (the basic version):

from kafka import KafkaConsumer

consumer = KafkaConsumer(
'queue_topic',
bootstrap_servers='localhost:9092',
group_id='shared_group', # Shared group for work distribution
auto_offset_reset='earliest'
)

for message in consumer:
print(f"Processed: {message.value.decode('utf-8')}")

In this model, Kafka will distribute the partitions evenly if the topic is configured with at least as many partitions as consumers. Each consumer processes a subset of messages, ensuring balanced workload distribution.

Scaling Out: Multi-threaded Consumers

To better demonstrate the potential of Apache Kafka, we will create multiple consumers through multi-threading in Python scripts. This way, we will be able to verify the behavior of the two distribution approaches by obtaining real data on the volume of messages received by each consumer.

Producer code

from kafka import KafkaProducer
import time

def run_producer():
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8')
)

topic_name = 'queue_topic'

print("Starting producer to send 12000 messages to 'queue_topic'...")
for i in range(12000):
message = f"Hello Queue Consumer, msg#{i}"
producer.send(topic_name, value=message)
print(f"Sent: {message}")
time.sleep(0.001)

producer.flush()
print("Producer finished sending messages.")

if __name__ == "__main__":
run_producer()

The producer code will be the same for all cases, the difference is that in this code we are sending 12 thousand messages.

Multi-threading Queue consumers

import threading
import sys
import time
from kafka import KafkaConsumer

# Configuration
TOPIC_NAME = 'queue_topic'
BOOTSTRAP_SERVERS = 'localhost:9092'
GROUP_ID = 'group'
NUM_CONSUMERS = 30 # Number of consumer threads
# APPROACH = 'queue'
APPROACH = 'stream'

# Shared data structure to hold message counts
message_counts = {}
print_lock = threading.Lock()

def clear_console():
"""Clears the console."""
sys.stdout.write('\033[2J\033[H')
sys.stdout.flush()

def initialize_display(num_consumers):
"""Initializes the console display with placeholders for each consumer."""
for i in range(num_consumers):
print(f"[Consumer {i+1}] Received messages: 0")

def update_display(consumer_id, count):
"""Updates the message count for a specific consumer."""
with print_lock:
# Move cursor to the beginning
sys.stdout.write(f"\033[{consumer_id+1};0H") # Move to line (consumer_id + 1)
sys.stdout.write(f"[Consumer {consumer_id+1}] Received messages: {count} ")
sys.stdout.flush()

def run_consumer(consumer_id):
"""Function to run a Kafka consumer in a thread."""
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id= GROUP_ID if APPROACH == 'queue' else f"{GROUP_ID}-{consumer_id}"
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=1000 # Exit if no message after timeout
)

#print(f"Consumer {consumer_id+1} started.")
message_count = 0

try:
while True:
for message in consumer:
message_count += 1
message_counts[consumer_id] = message_count
update_display(consumer_id, message_count)
time.sleep(1) # Prevent tight loop if no messages
except KeyboardInterrupt:
print(f"\nConsumer {consumer_id+1} interrupted and stopping.")
finally:
consumer.close()

def main():
# Clear and initialize the console
clear_console()
initialize_display(NUM_CONSUMERS)

# Start consumer threads
threads = []
for i in range(NUM_CONSUMERS):
thread = threading.Thread(target=run_consumer, args=(i,), daemon=True)
threads.append(thread)
thread.start()

print("\nPress Ctrl+C to stop consumers.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down consumers...")

if __name__ == "__main__":
main()

To execute the code above, you need to ensure that the following components are executed, preferably in this order:
1. Docker daemon
2. Docker-compose with Kafka and Zookeeper
3. Consumers
4. Producers

We will execute this twice. The first time, we will pass the parameter APPROACH = ‘stream,’ and the second time, we will pass APPROACH = ‘queue,’ both changed in Line 11.

[Consumer 1] Received messages: 12000   
[Consumer 2] Received messages: 12000
[Consumer 3] Received messages: 12000
[Consumer 4] Received messages: 12000
[Consumer 5] Received messages: 12000
[Consumer 6] Received messages: 12000
[Consumer 7] Received messages: 12000
[Consumer 8] Received messages: 12000
[Consumer 9] Received messages: 12000
[Consumer 10] Received messages: 12000
[Consumer 11] Received messages: 12000
[Consumer 12] Received messages: 12000
[Consumer 13] Received messages: 12000
[Consumer 14] Received messages: 12000
[Consumer 15] Received messages: 12000
[Consumer 16] Received messages: 12000
[Consumer 17] Received messages: 12000
[Consumer 18] Received messages: 12000
[Consumer 19] Received messages: 12000
[Consumer 20] Received messages: 12000
[Consumer 21] Received messages: 12000
[Consumer 22] Received messages: 12000
[Consumer 23] Received messages: 12000
[Consumer 24] Received messages: 12000
[Consumer 25] Received messages: 12000
[Consumer 26] Received messages: 12000
[Consumer 27] Received messages: 12000
[Consumer 28] Received messages: 12000
[Consumer 29] Received messages: 12000
[Consumer 30] Received messages: 12000

When consumers are organized so that each uses a unique group ID, different from the group IDs used by other consumers, the broadcast pattern is achieved, in which the 12,000 messages are delivered to the 30 consumers in the example.

Consumer                                               Dmean
[Consumer 1] Received messages: 365 - 35
[Consumer 2] Received messages: 392 - 8
[Consumer 3] Received messages: 419 - 19
[Consumer 4] Received messages: 386 - 14
[Consumer 5] Received messages: 391 - 9
[Consumer 6] Received messages: 400 - 0
[Consumer 7] Received messages: 414 - 14
[Consumer 8] Received messages: 403 - 3
[Consumer 9] Received messages: 405 - 5
[Consumer 10] Received messages: 404 - 4
[Consumer 11] Received messages: 436 - 36
[Consumer 12] Received messages: 397 - 3
[Consumer 13] Received messages: 387 - 13
[Consumer 14] Received messages: 421 - 21
[Consumer 15] Received messages: 437 - 37
[Consumer 16] Received messages: 390 - 10
[Consumer 17] Received messages: 384 - 16
[Consumer 18] Received messages: 410 - 10
[Consumer 19] Received messages: 398 - 2
[Consumer 20] Received messages: 368 - 32
[Consumer 21] Received messages: 400 - 0
[Consumer 22] Received messages: 420 - 20
[Consumer 23] Received messages: 391 - 9
[Consumer 24] Received messages: 415 - 15
[Consumer 25] Received messages: 430 - 30
[Consumer 26] Received messages: 388 - 12
[Consumer 27] Received messages: 387 - 13
[Consumer 28] Received messages: 374 - 26
[Consumer 29] Received messages: 377 - 23
[Consumer 30] Received messages: 411 - 11

On the other hand, when consumers are organized so that all consumers share the same group ID, the queue pattern (load balancing) is achieved. The balancing is not perfect. However, the deviation from the average (Dmean) found is irrelevant for load balancing purposes.

A hybrid topology is still possible. Imagine a scenario where, for each event, you need to trigger the following actions: (1) persist a database, (2) generate a notification, and (3) write a log.

In this scenario, you can use three distinct group IDs: database, notification, and logging. You can allocate 10 consumers to each group ID (database, notification, and logging). This way, each message will be delivered to 1 consumer from each of the three different group IDs that subscribed to the topic in question, thus achieving the broadcast pattern for the three groups and also load balancing among the 10 consumers in each group.

--

--

Luiz Oliveira
Luiz Oliveira

Written by Luiz Oliveira

I am skilled in Computer Science, Linux, and Programming Languages. When I am not coding, you can find me in a sailboat.

No responses yet