07

Module 7: Kafka with Python

Chapter 7 • Intermediate

60 min

Kafka with Python

You now know how Kafka works conceptually. In this module, you’ll learn how to use Kafka from Python:

  • Install Kafka client libraries
  • Write Python producers and consumers
  • Structure a small real-world project (e-commerce events)
  • Add error handling, retries, batching, and tests

This is the bridge from “Kafka diagrams” to “actual code running in my app”.


🎯 What You Will Learn

By the end of this module, you will be able to:

  • Choose the right Python Kafka client for your use case
  • Implement basic and advanced producers and consumers
  • Model domain events using Python classes/dataclasses
  • Build a small event-driven e-commerce example
  • Add retry logic, error handling, and batch processing
  • Write basic unit tests for Kafka integration code

🐍 Python Kafka Libraries

There are multiple Python clients for Kafka. Each has trade-offs.

1. kafka-python (Popular Pure-Python Client)

  • Mature, widely used pure Python client
  • Great for learning and many production workloads
  • Simple API and easy to read/teach
  • Install:
bash.js
    pip install kafka-python
    

2. confluent-kafka (Confluent Client)

  • High-performance client based on librdkafka (C library)
  • Excellent performance and reliability
  • Rich configuration and production-ready features
  • Install:
bash.js
    pip install confluent-kafka
    

3. aiokafka (Asyncio-based Client)

  • Asynchronous client for asyncio apps
  • Great for high-concurrency, async microservices
  • Integrates nicely with modern async frameworks (FastAPI, etc.)
  • Install:
bash.js
    pip install aiokafka
    

For this module, we’ll primarily use `kafka-python` because it’s very readable and good for teaching.


🧱 Setting Up the Python Environment

Create and Activate a Virtual Environment

bash.js
    # Create virtual environment
    python -m venv kafka-env
    
    # Activate environment
    # Windows:
    kafka-env\Scripts\activate
    # Linux/Mac:
    source kafka-env/bin/activate
    
    # Install dependencies
    pip install kafka-python confluent-kafka python-dotenv
    

requirements.txt

txt.js
    kafka-python==2.0.2
    confluent-kafka==2.3.0
    python-dotenv==1.0.0
    

You can pin versions like this for reproducible environments.


📤 Basic Producer with kafka-python

Simple Producer

python.js
    from kafka import KafkaProducer
    import json
    import time
    
    # Create producer
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8'),
        key_serializer=lambda x: x.encode('utf-8') if x else None
    )
    
    # Send messages
    for i in range(10):
        message = {
            'user_id': f'user_{i}',
            'action': 'login',
            'timestamp': time.time(),
            'ip_address': '192.168.1.100'
        }
        
        producer.send('user-events', key=f'user_{i}', value=message)
        print(f'Sent message {i}: {message}')
    
    # Flush and close
    producer.flush()
    producer.close()
    

Key ideas:

  • Serialization: Convert Python dict → JSON string → bytes
  • Key: Used for partitioning (same key → same partition)
  • Flush: Make sure all messages are actually sent before exit

Advanced Producer with Error Handling

python.js
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import json
    import logging
    import time
    
    # Setup logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    class EventProducer:
        def __init__(self, bootstrap_servers=['localhost:9092']):
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                key_serializer=lambda x: x.encode('utf-8') if x else None,
                acks='all',            # Wait for all in-sync replicas
                retries=3,             # Retry failed sends
                batch_size=16384,      # Batch size in bytes
                linger_ms=10,          # Wait time for batching
                compression_type='lz4' # Compression
            )
        
        def send_event(self, topic, key, event):
            try:
                future = self.producer.send(topic, key=key, value=event)
                record_metadata = future.get(timeout=10)
                logger.info(
                    f'Message sent to {record_metadata.topic} '
                    f'partition {record_metadata.partition} '
                    f'offset {record_metadata.offset}'
                )
                return True
            except KafkaError as e:
                logger.error(f'Failed to send message: {e}')
                return False
        
        def close(self):
            self.producer.close()
    
    # Usage
    if __name__ == '__main__':
        producer = EventProducer()
        event = {
            'user_id': 'user_123',
            'action': 'purchase',
            'product_id': 'laptop_001',
            'amount': 999.99,
            'timestamp': time.time()
        }
    
        producer.send_event('user-events', 'user_123', event)
        producer.close()
    

This is closer to what you’d use in a real service: batching, compression, retries, and logging.


📥 Basic Consumer with kafka-python

Simple Consumer

python.js
    from kafka import KafkaConsumer
    import json
    
    # Create consumer
    consumer = KafkaConsumer(
        'user-events',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        key_deserializer=lambda x: x.decode('utf-8') if x else None,
        group_id='analytics-group',
        auto_offset_reset='earliest'  # Start from beginning if no offset
    )
    
    # Consume messages
    for message in consumer:
        print('Received message:')
        print(f'  Topic: {message.topic}')
        print(f'  Partition: {message.partition}')
        print(f'  Offset: {message.offset}')
        print(f'  Key: {message.key}')
        print(f'  Value: {message.value}')
        print('---')
    

Notes:

  • `group_id`: Links this consumer to a consumer group
  • `auto_offset_reset`: Where to start reading if no committed offset

Advanced Consumer with Error Handling

python.js
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import json
    import logging
    import time
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    class EventConsumer:
        def __init__(self, topics, group_id, bootstrap_servers=['localhost:9092']):
            self.consumer = KafkaConsumer(
                *topics,
                bootstrap_servers=bootstrap_servers,
                value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                key_deserializer=lambda x: x.decode('utf-8') if x else None,
                group_id=group_id,
                auto_offset_reset='latest',
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                session_timeout_ms=30000,
                heartbeat_interval_ms=3000
            )
        
        def process_message(self, message):
            """Override this method to implement custom processing"""
            logger.info(f'Processing message: {message.value}')
            # Add your processing logic here
            return True
        
        def start_consuming(self):
            try:
                for message in self.consumer:
                    try:
                        success = self.process_message(message)
                        if success:
                            logger.info(f'Successfully processed message from {message.topic}')
                        else:
                            logger.error(f'Failed to process message from {message.topic}')
                    except Exception as e:
                        logger.error(f'Error processing message: {e}')
            except KeyboardInterrupt:
                logger.info('Stopping consumer...')
            finally:
                self.consumer.close()
    
    # Usage
    if __name__ == '__main__':
        consumer = EventConsumer(['user-events'], 'analytics-group')
        consumer.start_consuming()
    

Later, you can switch to manual commits for tighter control.


🏗️ Real-World Project: E-commerce Event Stream

Let’s design a small project that feels like a real system.

Project Structure

code
    ecommerce-kafka/
    ├── config/
    │   └── settings.py
    ├── producers/
    │   ├── __init__.py
    │   ├── order_producer.py
    │   └── user_producer.py
    ├── consumers/
    │   ├── __init__.py
    │   ├── analytics_consumer.py
    │   └── notification_consumer.py
    ├── models/
    │   ├── __init__.py
    │   └── events.py
    └── main.py
    

This separates:

  • Models (event schemas)
  • Producers (writers)
  • Consumers (readers/processing)
  • Config (cluster, topic names, etc.)

Event Models with dataclasses

python.js
    # models/events.py
    from dataclasses import dataclass
    from typing import Optional
    import time
    
    @dataclass
    class OrderEvent:
        order_id: str
        user_id: str
        product_id: str
        quantity: int
        price: float
        status: str  # created, confirmed, shipped, delivered
        timestamp: float = None
        
        def __post_init__(self):
            if self.timestamp is None:
                self.timestamp = time.time()
    
    @dataclass
    class UserEvent:
        user_id: str
        action: str  # login, logout, view_product, add_to_cart
        product_id: Optional[str] = None
        timestamp: float = None
        
        def __post_init__(self):
            if self.timestamp is None:
                self.timestamp = time.time()
    

Dataclasses give you:

  • Type hints
  • Nice __repr__
  • Easy JSON serialization via __dict__

Order Producer

python.js
    # producers/order_producer.py
    from kafka import KafkaProducer
    from models.events import OrderEvent
    import json
    import logging
    
    logger = logging.getLogger(__name__)
    logging.basicConfig(level=logging.INFO)
    
    class OrderProducer:
        def __init__(self, bootstrap_servers=['localhost:9092']):
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda x: json.dumps(x.__dict__).encode('utf-8'),
                key_serializer=lambda x: x.encode('utf-8'),
                acks='all',
                retries=3
            )
        
        def send_order_event(self, order_event: OrderEvent):
            try:
                future = self.producer.send(
                    'order-events',
                    key=order_event.order_id,
                    value=order_event
                )
                record_metadata = future.get(timeout=10)
                logger.info(f'Order event sent: {order_event.order_id}')
                return True
            except Exception as e:
                logger.error(f'Failed to send order event: {e}')
                return False
        
        def close(self):
            self.producer.close()
    
    # Example usage
    if __name__ == '__main__':
        producer = OrderProducer()
        
        # Create sample order events
        orders = [
            OrderEvent('order_001', 'user_123', 'laptop_001', 1, 999.99, 'created'),
            OrderEvent('order_002', 'user_456', 'mouse_001', 2, 29.99, 'created'),
            OrderEvent('order_003', 'user_789', 'keyboard_001', 1, 79.99, 'created')
        ]
        
        for order in orders:
            producer.send_order_event(order)
        
        producer.close()
    

Analytics Consumer

python.js
    # consumers/analytics_consumer.py
    from kafka import KafkaConsumer
    from models.events import OrderEvent
    import json
    import logging
    from collections import defaultdict
    
    logger = logging.getLogger(__name__)
    logging.basicConfig(level=logging.INFO)
    
    class AnalyticsConsumer:
        def __init__(self, bootstrap_servers=['localhost:9092']):
            self.consumer = KafkaConsumer(
                'order-events',
                bootstrap_servers=bootstrap_servers,
                value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                group_id='analytics-group',
                auto_offset_reset='latest'
            )
            self.stats = defaultdict(int)
        
        def process_order_event(self, order_data):
            """Process order events for analytics"""
            order = OrderEvent(**order_data)
            
            # Update statistics
            self.stats['total_orders'] += 1
            self.stats[f'status_{order.status}'] += 1
            self.stats[f'user_{order.user_id}'] += 1
            
            # Log analytics
            logger.info(f'Analytics - Order {order.order_id}: {order.status} for ${order.price}')
            
            # Print current stats every 10 orders
            if self.stats['total_orders'] % 10 == 0:
                self.print_stats()
        
        def print_stats(self):
            print('\n=== Analytics Summary ===')
            print(f'Total Orders: {self.stats["total_orders"]}')
            for key, value in self.stats.items():
                if key.startswith('status_'):
                    print(f'{key}: {value}')
            print('========================\n')
        
        def start_consuming(self):
            try:
                for message in self.consumer:
                    self.process_order_event(message.value)
            except KeyboardInterrupt:
                logger.info('Stopping analytics consumer...')
            finally:
                self.consumer.close()
    
    # Example usage
    if __name__ == '__main__':
        consumer = AnalyticsConsumer()
        consumer.start_consuming()
    

You now have:

  • A producer emitting structured events
  • A consumer aggregating stats from the stream

🛡️ Error Handling and Retry Logic

Producer with Retry Logic and Backoff

python.js
    import time
    import json
    import logging
    from kafka import KafkaProducer
    from kafka.errors import KafkaError, RetriableError
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    class RobustProducer:
        def __init__(self, bootstrap_servers=['localhost:9092']):
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                retries=3,
                retry_backoff_ms=100,
                request_timeout_ms=30000
            )
        
        def send_with_retry(self, topic, key, value, max_retries=3):
            for attempt in range(max_retries):
                try:
                    future = self.producer.send(topic, key=key, value=value)
                    record_metadata = future.get(timeout=10)
                    logger.info(
                        f'Message sent to {record_metadata.topic} '
                        f'partition {record_metadata.partition} '
                        f'offset {record_metadata.offset}'
                    )
                    return True
                except RetriableError as e:
                    logger.warning(f'Retriable error on attempt {attempt + 1}: {e}')
                    if attempt < max_retries - 1:
                        time.sleep(2 ** attempt)  # Exponential backoff
                    else:
                        logger.error(f'Failed after {max_retries} attempts')
                        return False
                except KafkaError as e:
                    logger.error(f'Non-retriable error: {e}')
                    return False
                except Exception as e:
                    logger.error(f'Unexpected error: {e}')
                    return False
            return False
    

This pattern is common in real services: retry only retriable errors, back off, and log clearly.


⚡ Performance Optimization

Batch Processing Pattern

python.js
    import logging
    
    logger = logging.getLogger(__name__)
    
    class BatchProcessor:
        def __init__(self, batch_size=100):
            self.batch_size = batch_size
            self.batch = []
        
        def add_message(self, message):
            self.batch.append(message)
            if len(self.batch) >= self.batch_size:
                self.process_batch()
        
        def process_batch(self):
            if self.batch:
                # Process batch of messages
                logger.info(f'Processing batch of {len(self.batch)} messages')
                # Add your batch logic here (DB bulk insert, etc.)
                self.batch.clear()
        
        def flush(self):
            if self.batch:
                self.process_batch()
    

Use this inside your consumer to group messages and reduce DB/API overhead.


✅ Testing Kafka Applications

You don’t want your Kafka integration to be a black box.

Unit Testing Example

python.js
    import unittest
    from unittest.mock import Mock, patch
    from producers.order_producer import OrderProducer
    from models.events import OrderEvent
    
    class TestOrderProducer(unittest.TestCase):
        def setUp(self):
            # We'll patch KafkaProducer in the test method itself
            self.producer = OrderProducer()
        
        @patch('producers.order_producer.KafkaProducer')
        def test_send_order_event(self, mock_producer_class):
            # Mock the producer
            mock_producer = Mock()
            mock_producer_class.return_value = mock_producer
            mock_future = Mock()
            mock_future.get.return_value = Mock(offset=123)
            mock_producer.send.return_value = mock_future
            
            # Create test event
            order_event = OrderEvent('test_order', 'user_123', 'product_001', 1, 99.99, 'created')
            
            # Test sending
            result = self.producer.send_order_event(order_event)
            
            # Assertions
            self.assertTrue(result)
            mock_producer.send.assert_called_once()
    

Idea: mock the Kafka client, test your own logic in isolation.


🧠 Best Practices for Kafka + Python

1. Connection Management

  • Reuse producers and consumers instead of creating them per request
  • Close producer/consumer instances on shutdown
  • Avoid long-lived, unused connections

2. Error Handling

  • Add retry logic for transient errors
  • Distinguish retriable vs non-retriable errors
  • Log enough context (topic, key, exception type)

3. Performance

  • Use batching in consumers (and producer configs: batch.size, linger.ms)
  • Tune producer/consumer settings based on your workload
  • Monitor throughput, latency, and consumer lag

4. Security (for Production)

  • Use SSL/TLS for encryption in transit
  • Configure authentication (e.g., SASL mechanisms, Kerberos)
  • Validate and sanitize data before sending/processing

✅ Key Takeaways

  • kafka-python is a great starting point for learning, while confluent-kafka offers better performance for production
  • aiokafka is ideal for async Python applications (FastAPI, asyncio-based services)
  • Always use serializers (JSON, Avro, Protobuf) for structured data - never send raw strings
  • Error handling and retry logic are critical for production Kafka producers
  • Consumer groups enable parallel processing and load balancing across multiple consumer instances
  • Batch processing improves throughput but requires careful offset management
  • Use dataclasses or Pydantic models to structure your event data for type safety
  • Testing Kafka integration code requires mocking or using embedded Kafka for unit tests
  • Idempotent producers prevent duplicate messages in case of retries
  • Python's Kafka clients abstract away the protocol details, but understanding the concepts still matters

🚀 Next Steps

With Python Kafka integration in place, you can now:

  1. Build real microservices that communicate via Kafka topics
  2. Implement real-time analytics and dashboards
  3. Apply patterns like CQRS, event sourcing, and stream processing
  4. Integrate with frameworks like FastAPI, Django, or Flask using background workers

In the next parts of the course, you can extend this foundation into stream processing, exactly-once patterns, and end-to-end architectures that combine Kafka, databases, and APIs.

📚 Continue Learning

  • Practice: Build a small event-driven microservice using Python and Kafka
  • Quiz: Test your understanding of Python Kafka integration concepts
  • Next Module: Learn about Kafka Monitoring & Optimization in Module 8
  • Related Resources:
  • [kafka-python Documentation](https://kafka-python.readthedocs.io/)
  • [Confluent Python Client](https://github.com/confluentinc/confluent-kafka-python)
  • [FastAPI with Kafka Example](https://fastapi.tiangolo.com/advanced/background-tasks/)

Hands-on Examples

Complete Python Producer-Consumer Example

# Complete Python Kafka Example
    
      ## producer.py
      from kafka import KafkaProducer
      import json
      import time
      import random
    
      def create_producer():
          return KafkaProducer(
              bootstrap_servers=['localhost:9092'],
              value_serializer=lambda x: json.dumps(x).encode('utf-8'),
              key_serializer=lambda x: x.encode('utf-8'),
              acks='all',
              retries=3
          )
    
      def send_user_events(producer):
          users = ['alice', 'bob', 'charlie', 'diana']
          actions = ['login', 'logout', 'view_product', 'add_to_cart', 'purchase']
          products = ['laptop', 'mouse', 'keyboard', 'monitor']
          
          for i in range(20):
              event = {
                  'user_id': random.choice(users),
                  'action': random.choice(actions),
                  'product_id': random.choice(products),
                  'timestamp': time.time(),
                  'session_id': f'session_{i}'
              }
              
              producer.send('user-events', key=event['user_id'], value=event)
              print(f'Sent: {event}')
              time.sleep(1)
          
          producer.flush()
    
      if __name__ == '__main__':
          producer = create_producer()
          send_user_events(producer)
          producer.close()
    
      ## consumer.py
      from kafka import KafkaConsumer
      import json
      from collections import defaultdict
    
      def create_consumer():
          return KafkaConsumer(
              'user-events',
              bootstrap_servers=['localhost:9092'],
              value_deserializer=lambda x: json.loads(x.decode('utf-8')),
              group_id='analytics-group',
              auto_offset_reset='earliest'
          )
    
      def process_events(consumer):
          stats = defaultdict(int)
          
          for message in consumer:
              event = message.value
              stats['total_events'] += 1
              stats[f'action_{event["action"]}'] += 1
              stats[f'user_{event["user_id"]}'] += 1
              
              print(f'Processed: {event["user_id"]} - {event["action"]}')
              
              if stats['total_events'] % 5 == 0:
                  print(f'\nStats: {dict(stats)}\n')
    
      if __name__ == '__main__':
          consumer = create_consumer()
          process_events(consumer)
          consumer.close()
    
      ## Run the example:
      # Terminal 1: Start consumer
      python consumer.py
    
      # Terminal 2: Start producer
      python producer.py

This complete example shows how to create a producer that sends user events and a consumer that processes them with real-time analytics.