🚀
Module 7: Kafka with Python
60 minutes2 examplesIntermediate
Hands-on Examples
Interactive examples to reinforce your learning
Complete Python Producer-Consumer Example
Full working example with producer and consumer
Code Example
# Complete Python Kafka Example
## producer.py
from kafka import KafkaProducer
import json
import time
import random
def create_producer():
    return KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8'),
        key_serializer=lambda x: x.encode('utf-8'),
        acks='all',
        retries=3
    )
def send_user_events(producer):
    users = ['alice', 'bob', 'charlie', 'diana']
    actions = ['login', 'logout', 'view_product', 'add_to_cart', 'purchase']
    products = ['laptop', 'mouse', 'keyboard', 'monitor']
    
    for i in range(20):
        event = {
            'user_id': random.choice(users),
            'action': random.choice(actions),
            'product_id': random.choice(products),
            'timestamp': time.time(),
            'session_id': f'session_{{i}}'
        }
        
        producer.send('user-events', key=event['user_id'], value=event)
        print(f'Sent: {{event}}')
        time.sleep(1)
    
    producer.flush()
if __name__ == '__main__':
    producer = create_producer()
    send_user_events(producer)
    producer.close()
## consumer.py
from kafka import KafkaConsumer
import json
from collections import defaultdict
def create_consumer():
    return KafkaConsumer(
        'user-events',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='analytics-group',
        auto_offset_reset='earliest'
    )
def process_events(consumer):
    stats = defaultdict(int)
    
    for message in consumer:
        event = message.value
        stats['total_events'] += 1
        stats[f'action_{{event["action"]}}'] += 1
        stats[f'user_{{event["user_id"]}}'] += 1
        
        print(f'Processed: {{event["user_id"]}} - {{event["action"]}}')
        
        if stats['total_events'] % 5 == 0:
            print(f'\nStats: {{dict(stats)}}\n')
if __name__ == '__main__':
    consumer = create_consumer()
    process_events(consumer)
    consumer.close()
## Run the example:
# Terminal 1: Start consumer
python consumer.py
# Terminal 2: Start producer
python producer.pyExpected Output:
Producer sends 20 user events, consumer processes them and displays analytics statistics.
Explanation:
This complete example shows how to create a producer that sends user events and a consumer that processes them with real-time analytics.
Course Navigation
1
Module 1: Introduction to Kafka2
Module 2: The Problem Statement3
Module 3: How Kafka Solves the Problem4
Module 4: Kafka Architecture (Deep Dive)5
Module 5: Consumer Groups in Kafka6
Module 6: Kafka Setup & Hands-On7
Module 7: Kafka with Python8
Module 8: Kafka Monitoring & Optimization9
Module 9: Final Project - Real-Time Analytics PlatformYour Progress
Examples Completed0/2