Module 7: Kafka with Python
Chapter 7 • Intermediate
Kafka with Python
You now know how Kafka works conceptually. In this module, you’ll learn how to use Kafka from Python:
- Install Kafka client libraries
- Write Python producers and consumers
- Structure a small real-world project (e-commerce events)
- Add error handling, retries, batching, and tests
This is the bridge from “Kafka diagrams” to “actual code running in my app”.
🎯 What You Will Learn
By the end of this module, you will be able to:
- Choose the right Python Kafka client for your use case
- Implement basic and advanced producers and consumers
- Model domain events using Python classes/dataclasses
- Build a small event-driven e-commerce example
- Add retry logic, error handling, and batch processing
- Write basic unit tests for Kafka integration code
🐍 Python Kafka Libraries
There are multiple Python clients for Kafka. Each has trade-offs.
1. kafka-python (Popular Pure-Python Client)
- Mature, widely used pure Python client
- Great for learning and many production workloads
- Simple API and easy to read/teach
- Install:
pip install kafka-python
2. confluent-kafka (Confluent Client)
- High-performance client based on librdkafka (C library)
- Excellent performance and reliability
- Rich configuration and production-ready features
- Install:
pip install confluent-kafka
3. aiokafka (Asyncio-based Client)
- Asynchronous client for asyncio apps
- Great for high-concurrency, async microservices
- Integrates nicely with modern async frameworks (FastAPI, etc.)
- Install:
pip install aiokafka
For this module, we’ll primarily use `kafka-python` because it’s very readable and good for teaching.
🧱 Setting Up the Python Environment
Create and Activate a Virtual Environment
# Create virtual environment
python -m venv kafka-env
# Activate environment
# Windows:
kafka-env\Scripts\activate
# Linux/Mac:
source kafka-env/bin/activate
# Install dependencies
pip install kafka-python confluent-kafka python-dotenv
requirements.txt
kafka-python==2.0.2
confluent-kafka==2.3.0
python-dotenv==1.0.0
You can pin versions like this for reproducible environments.
📤 Basic Producer with kafka-python
Simple Producer
from kafka import KafkaProducer
import json
import time
# Create producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8') if x else None
)
# Send messages
for i in range(10):
message = {
'user_id': f'user_{i}',
'action': 'login',
'timestamp': time.time(),
'ip_address': '192.168.1.100'
}
producer.send('user-events', key=f'user_{i}', value=message)
print(f'Sent message {i}: {message}')
# Flush and close
producer.flush()
producer.close()
Key ideas:
- Serialization: Convert Python dict → JSON string → bytes
- Key: Used for partitioning (same key → same partition)
- Flush: Make sure all messages are actually sent before exit
Advanced Producer with Error Handling
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
import time
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventProducer:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8') if x else None,
acks='all', # Wait for all in-sync replicas
retries=3, # Retry failed sends
batch_size=16384, # Batch size in bytes
linger_ms=10, # Wait time for batching
compression_type='lz4' # Compression
)
def send_event(self, topic, key, event):
try:
future = self.producer.send(topic, key=key, value=event)
record_metadata = future.get(timeout=10)
logger.info(
f'Message sent to {record_metadata.topic} '
f'partition {record_metadata.partition} '
f'offset {record_metadata.offset}'
)
return True
except KafkaError as e:
logger.error(f'Failed to send message: {e}')
return False
def close(self):
self.producer.close()
# Usage
if __name__ == '__main__':
producer = EventProducer()
event = {
'user_id': 'user_123',
'action': 'purchase',
'product_id': 'laptop_001',
'amount': 999.99,
'timestamp': time.time()
}
producer.send_event('user-events', 'user_123', event)
producer.close()
This is closer to what you’d use in a real service: batching, compression, retries, and logging.
📥 Basic Consumer with kafka-python
Simple Consumer
from kafka import KafkaConsumer
import json
# Create consumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
key_deserializer=lambda x: x.decode('utf-8') if x else None,
group_id='analytics-group',
auto_offset_reset='earliest' # Start from beginning if no offset
)
# Consume messages
for message in consumer:
print('Received message:')
print(f' Topic: {message.topic}')
print(f' Partition: {message.partition}')
print(f' Offset: {message.offset}')
print(f' Key: {message.key}')
print(f' Value: {message.value}')
print('---')
Notes:
- `group_id`: Links this consumer to a consumer group
- `auto_offset_reset`: Where to start reading if no committed offset
Advanced Consumer with Error Handling
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventConsumer:
def __init__(self, topics, group_id, bootstrap_servers=['localhost:9092']):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
key_deserializer=lambda x: x.decode('utf-8') if x else None,
group_id=group_id,
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
session_timeout_ms=30000,
heartbeat_interval_ms=3000
)
def process_message(self, message):
"""Override this method to implement custom processing"""
logger.info(f'Processing message: {message.value}')
# Add your processing logic here
return True
def start_consuming(self):
try:
for message in self.consumer:
try:
success = self.process_message(message)
if success:
logger.info(f'Successfully processed message from {message.topic}')
else:
logger.error(f'Failed to process message from {message.topic}')
except Exception as e:
logger.error(f'Error processing message: {e}')
except KeyboardInterrupt:
logger.info('Stopping consumer...')
finally:
self.consumer.close()
# Usage
if __name__ == '__main__':
consumer = EventConsumer(['user-events'], 'analytics-group')
consumer.start_consuming()
Later, you can switch to manual commits for tighter control.
🏗️ Real-World Project: E-commerce Event Stream
Let’s design a small project that feels like a real system.
Project Structure
ecommerce-kafka/
├── config/
│ └── settings.py
├── producers/
│ ├── __init__.py
│ ├── order_producer.py
│ └── user_producer.py
├── consumers/
│ ├── __init__.py
│ ├── analytics_consumer.py
│ └── notification_consumer.py
├── models/
│ ├── __init__.py
│ └── events.py
└── main.py
This separates:
- Models (event schemas)
- Producers (writers)
- Consumers (readers/processing)
- Config (cluster, topic names, etc.)
Event Models with dataclasses
# models/events.py
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class OrderEvent:
order_id: str
user_id: str
product_id: str
quantity: int
price: float
status: str # created, confirmed, shipped, delivered
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
@dataclass
class UserEvent:
user_id: str
action: str # login, logout, view_product, add_to_cart
product_id: Optional[str] = None
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
Dataclasses give you:
- Type hints
- Nice
__repr__ - Easy JSON serialization via
__dict__
Order Producer
# producers/order_producer.py
from kafka import KafkaProducer
from models.events import OrderEvent
import json
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class OrderProducer:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: json.dumps(x.__dict__).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8'),
acks='all',
retries=3
)
def send_order_event(self, order_event: OrderEvent):
try:
future = self.producer.send(
'order-events',
key=order_event.order_id,
value=order_event
)
record_metadata = future.get(timeout=10)
logger.info(f'Order event sent: {order_event.order_id}')
return True
except Exception as e:
logger.error(f'Failed to send order event: {e}')
return False
def close(self):
self.producer.close()
# Example usage
if __name__ == '__main__':
producer = OrderProducer()
# Create sample order events
orders = [
OrderEvent('order_001', 'user_123', 'laptop_001', 1, 999.99, 'created'),
OrderEvent('order_002', 'user_456', 'mouse_001', 2, 29.99, 'created'),
OrderEvent('order_003', 'user_789', 'keyboard_001', 1, 79.99, 'created')
]
for order in orders:
producer.send_order_event(order)
producer.close()
Analytics Consumer
# consumers/analytics_consumer.py
from kafka import KafkaConsumer
from models.events import OrderEvent
import json
import logging
from collections import defaultdict
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class AnalyticsConsumer:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.consumer = KafkaConsumer(
'order-events',
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
group_id='analytics-group',
auto_offset_reset='latest'
)
self.stats = defaultdict(int)
def process_order_event(self, order_data):
"""Process order events for analytics"""
order = OrderEvent(**order_data)
# Update statistics
self.stats['total_orders'] += 1
self.stats[f'status_{order.status}'] += 1
self.stats[f'user_{order.user_id}'] += 1
# Log analytics
logger.info(f'Analytics - Order {order.order_id}: {order.status} for ${order.price}')
# Print current stats every 10 orders
if self.stats['total_orders'] % 10 == 0:
self.print_stats()
def print_stats(self):
print('\n=== Analytics Summary ===')
print(f'Total Orders: {self.stats["total_orders"]}')
for key, value in self.stats.items():
if key.startswith('status_'):
print(f'{key}: {value}')
print('========================\n')
def start_consuming(self):
try:
for message in self.consumer:
self.process_order_event(message.value)
except KeyboardInterrupt:
logger.info('Stopping analytics consumer...')
finally:
self.consumer.close()
# Example usage
if __name__ == '__main__':
consumer = AnalyticsConsumer()
consumer.start_consuming()
You now have:
- A producer emitting structured events
- A consumer aggregating stats from the stream
🛡️ Error Handling and Retry Logic
Producer with Retry Logic and Backoff
import time
import json
import logging
from kafka import KafkaProducer
from kafka.errors import KafkaError, RetriableError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustProducer:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
retries=3,
retry_backoff_ms=100,
request_timeout_ms=30000
)
def send_with_retry(self, topic, key, value, max_retries=3):
for attempt in range(max_retries):
try:
future = self.producer.send(topic, key=key, value=value)
record_metadata = future.get(timeout=10)
logger.info(
f'Message sent to {record_metadata.topic} '
f'partition {record_metadata.partition} '
f'offset {record_metadata.offset}'
)
return True
except RetriableError as e:
logger.warning(f'Retriable error on attempt {attempt + 1}: {e}')
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
logger.error(f'Failed after {max_retries} attempts')
return False
except KafkaError as e:
logger.error(f'Non-retriable error: {e}')
return False
except Exception as e:
logger.error(f'Unexpected error: {e}')
return False
return False
This pattern is common in real services: retry only retriable errors, back off, and log clearly.
⚡ Performance Optimization
Batch Processing Pattern
import logging
logger = logging.getLogger(__name__)
class BatchProcessor:
def __init__(self, batch_size=100):
self.batch_size = batch_size
self.batch = []
def add_message(self, message):
self.batch.append(message)
if len(self.batch) >= self.batch_size:
self.process_batch()
def process_batch(self):
if self.batch:
# Process batch of messages
logger.info(f'Processing batch of {len(self.batch)} messages')
# Add your batch logic here (DB bulk insert, etc.)
self.batch.clear()
def flush(self):
if self.batch:
self.process_batch()
Use this inside your consumer to group messages and reduce DB/API overhead.
✅ Testing Kafka Applications
You don’t want your Kafka integration to be a black box.
Unit Testing Example
import unittest
from unittest.mock import Mock, patch
from producers.order_producer import OrderProducer
from models.events import OrderEvent
class TestOrderProducer(unittest.TestCase):
def setUp(self):
# We'll patch KafkaProducer in the test method itself
self.producer = OrderProducer()
@patch('producers.order_producer.KafkaProducer')
def test_send_order_event(self, mock_producer_class):
# Mock the producer
mock_producer = Mock()
mock_producer_class.return_value = mock_producer
mock_future = Mock()
mock_future.get.return_value = Mock(offset=123)
mock_producer.send.return_value = mock_future
# Create test event
order_event = OrderEvent('test_order', 'user_123', 'product_001', 1, 99.99, 'created')
# Test sending
result = self.producer.send_order_event(order_event)
# Assertions
self.assertTrue(result)
mock_producer.send.assert_called_once()
Idea: mock the Kafka client, test your own logic in isolation.
🧠 Best Practices for Kafka + Python
1. Connection Management
- Reuse producers and consumers instead of creating them per request
- Close producer/consumer instances on shutdown
- Avoid long-lived, unused connections
2. Error Handling
- Add retry logic for transient errors
- Distinguish retriable vs non-retriable errors
- Log enough context (topic, key, exception type)
3. Performance
- Use batching in consumers (and producer configs: batch.size, linger.ms)
- Tune producer/consumer settings based on your workload
- Monitor throughput, latency, and consumer lag
4. Security (for Production)
- Use SSL/TLS for encryption in transit
- Configure authentication (e.g., SASL mechanisms, Kerberos)
- Validate and sanitize data before sending/processing
✅ Key Takeaways
- kafka-python is a great starting point for learning, while confluent-kafka offers better performance for production
- aiokafka is ideal for async Python applications (FastAPI, asyncio-based services)
- Always use serializers (JSON, Avro, Protobuf) for structured data - never send raw strings
- Error handling and retry logic are critical for production Kafka producers
- Consumer groups enable parallel processing and load balancing across multiple consumer instances
- Batch processing improves throughput but requires careful offset management
- Use dataclasses or Pydantic models to structure your event data for type safety
- Testing Kafka integration code requires mocking or using embedded Kafka for unit tests
- Idempotent producers prevent duplicate messages in case of retries
- Python's Kafka clients abstract away the protocol details, but understanding the concepts still matters
🚀 Next Steps
With Python Kafka integration in place, you can now:
- Build real microservices that communicate via Kafka topics
- Implement real-time analytics and dashboards
- Apply patterns like CQRS, event sourcing, and stream processing
- Integrate with frameworks like FastAPI, Django, or Flask using background workers
In the next parts of the course, you can extend this foundation into stream processing, exactly-once patterns, and end-to-end architectures that combine Kafka, databases, and APIs.
📚 Continue Learning
- Practice: Build a small event-driven microservice using Python and Kafka
- Quiz: Test your understanding of Python Kafka integration concepts
- Next Module: Learn about Kafka Monitoring & Optimization in Module 8
- Related Resources:
- [kafka-python Documentation](https://kafka-python.readthedocs.io/)
- [Confluent Python Client](https://github.com/confluentinc/confluent-kafka-python)
- [FastAPI with Kafka Example](https://fastapi.tiangolo.com/advanced/background-tasks/)
Hands-on Examples
Complete Python Producer-Consumer Example
# Complete Python Kafka Example
## producer.py
from kafka import KafkaProducer
import json
import time
import random
def create_producer():
return KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8'),
acks='all',
retries=3
)
def send_user_events(producer):
users = ['alice', 'bob', 'charlie', 'diana']
actions = ['login', 'logout', 'view_product', 'add_to_cart', 'purchase']
products = ['laptop', 'mouse', 'keyboard', 'monitor']
for i in range(20):
event = {
'user_id': random.choice(users),
'action': random.choice(actions),
'product_id': random.choice(products),
'timestamp': time.time(),
'session_id': f'session_{i}'
}
producer.send('user-events', key=event['user_id'], value=event)
print(f'Sent: {event}')
time.sleep(1)
producer.flush()
if __name__ == '__main__':
producer = create_producer()
send_user_events(producer)
producer.close()
## consumer.py
from kafka import KafkaConsumer
import json
from collections import defaultdict
def create_consumer():
return KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
group_id='analytics-group',
auto_offset_reset='earliest'
)
def process_events(consumer):
stats = defaultdict(int)
for message in consumer:
event = message.value
stats['total_events'] += 1
stats[f'action_{event["action"]}'] += 1
stats[f'user_{event["user_id"]}'] += 1
print(f'Processed: {event["user_id"]} - {event["action"]}')
if stats['total_events'] % 5 == 0:
print(f'\nStats: {dict(stats)}\n')
if __name__ == '__main__':
consumer = create_consumer()
process_events(consumer)
consumer.close()
## Run the example:
# Terminal 1: Start consumer
python consumer.py
# Terminal 2: Start producer
python producer.pyThis complete example shows how to create a producer that sends user events and a consumer that processes them with real-time analytics.