🚀

Module 7: Kafka with Python

60 minutes2 examplesIntermediate

Hands-on Examples

Interactive examples to reinforce your learning

Complete Python Producer-Consumer Example

Full working example with producer and consumer

Code Example
# Complete Python Kafka Example

## producer.py
from kafka import KafkaProducer
import json
import time
import random

def create_producer():
    return KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8'),
        key_serializer=lambda x: x.encode('utf-8'),
        acks='all',
        retries=3
    )

def send_user_events(producer):
    users = ['alice', 'bob', 'charlie', 'diana']
    actions = ['login', 'logout', 'view_product', 'add_to_cart', 'purchase']
    products = ['laptop', 'mouse', 'keyboard', 'monitor']
    
    for i in range(20):
        event = {
            'user_id': random.choice(users),
            'action': random.choice(actions),
            'product_id': random.choice(products),
            'timestamp': time.time(),
            'session_id': f'session_{{i}}'
        }
        
        producer.send('user-events', key=event['user_id'], value=event)
        print(f'Sent: {{event}}')
        time.sleep(1)
    
    producer.flush()

if __name__ == '__main__':
    producer = create_producer()
    send_user_events(producer)
    producer.close()

## consumer.py
from kafka import KafkaConsumer
import json
from collections import defaultdict

def create_consumer():
    return KafkaConsumer(
        'user-events',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='analytics-group',
        auto_offset_reset='earliest'
    )

def process_events(consumer):
    stats = defaultdict(int)
    
    for message in consumer:
        event = message.value
        stats['total_events'] += 1
        stats[f'action_{{event["action"]}}'] += 1
        stats[f'user_{{event["user_id"]}}'] += 1
        
        print(f'Processed: {{event["user_id"]}} - {{event["action"]}}')
        
        if stats['total_events'] % 5 == 0:
            print(f'\nStats: {{dict(stats)}}\n')

if __name__ == '__main__':
    consumer = create_consumer()
    process_events(consumer)
    consumer.close()

## Run the example:
# Terminal 1: Start consumer
python consumer.py

# Terminal 2: Start producer
python producer.py

Expected Output:

Producer sends 20 user events, consumer processes them and displays analytics statistics.

Explanation:

This complete example shows how to create a producer that sends user events and a consumer that processes them with real-time analytics.