Building a Real-time Feedback System with Django, Kafka, Spark, and S3

||
Posted 6 months, 3 weeks ago
||
Views 232
||
7 min read
0 reactions

In today’s data-driven landscape, the ability to capture, process, and store user-generated data in real-time is essential for many applications. This guide provides a detailed walk-through of creating an end-to-end application using Django for data capture, Kafka for real-time data streaming, Spark Streaming for processing, and Amazon S3 for storage.

Prerequisites:

AWS Setup:

  • EC2 instance (e.g., t2.large)
  • Opened ports: SSH (22), HTTP/HTTPS (80, 443), Kafka (9092)
  • IAM role for S3 access
  • S3 storage creation

EC2 Environment:

  • Ubuntu 20.04 or similar OS
  • Java installed
  • Python with pip
  • Python virtual environment

Software/Frameworks:

  • Django
  • Confluent Kafka for Python
  • Apache Spark + Streaming module
  • S3 bucket ready

Dependencies:

  • Kafka with Zookeeper
  • S3 connectors for Spark

Networking:

  • Configured inter-service communication

1. Setting up Django for Feedback Collection

Start with a Django project and app:

django-admin startproject user_feedback
cd user_feedback
python manage.py startapp feedback_realtime

Designing Feedback Form

In feedback_realtime/models.py:

from django.db import models

class Feedback(models.Model):
    content = models.TextField()

Then, in feedback_realtime/forms.py:

from django import forms
from .models import Feedback

class FeedbackForm(forms.ModelForm):
    class Meta:
        model = Feedback
        fields = ['content']

View to Handle Feedback in feedback_realtime/views.py:

from django.shortcuts import render, redirect
from .forms import FeedbackForm

def feedback_view(request):
    if request.method == "POST":
        form = FeedbackForm(request.POST)
        if form.is_valid():
            form.save()
            return redirect('feedback_success')
    else:
        form = FeedbackForm()
    return render(request, 'feedback_realtime/feedback_form.html', {'form': form})

def feedback_success(request):
    return render(request, 'feedback_realtime/feedback_success.html')

URLs Configuration in feedback_realtime/urls.py:

from django.urls import path
from .views import feedback_view, feedback_success

urlpatterns = [
    path('feedback/', feedback_view, name='feedback_form'),
    path('feedback_success/', feedback_success, name='feedback_success'),
]

HTML Templates:

First, ensure there’s a directory named templates in the feedback_realtime app. Inside templates, create another directory named feedback_realtime.

  • Feedback Form Template (feedback_form.html):
{% extends "base.html" %}

{% block content %}
  <h2>Feedback Form</h2>
  <form method="post">
    {% csrf_token %}
    {{ form.as_p }}
    <button type="submit">Submit Feedback</button>
  </form>
{% endblock %}

Feedback Success Template (feedback_success.html):

{% extends "base.html" %}

{% block content %}
  <h2>Thank you for your feedback!</h2>
{% endblock %}

Lastly, make sure to include the feedback_realtime app URLs in your main project's user_feedback/urls.py:

from django.urls import path, include

urlpatterns = [
    ...
    path('', include('feedback_realtime.urls')),
]

Run migrations, start the Django server, and you should now be able to access the feedback form at /feedback/ on your server.

Cool :) . Successfully we have designed our basic feedback form in Django. Now its time to Integrate Kafka into Django

Integrating Kafka into Django requires you to set up producers and possibly consumers in your Django application. We’ll focus on setting up a producer in the Django app to send messages (feedback) to a Kafka topic.

Kafka Server Setup:

First, make sure you have Kafka and ZooKeeper installed. If using the official Kafka distribution:

Start ZooKeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka Server:

bin/kafka-server-start.sh config/server.properties

Install the Kafka Python Client:

You’ll need to install the confluent-kafka library which is a Kafka client for Python.

pip install confluent-kafka

Integrating Kafka:

  1. Kafka Configuration:
  2. Create a configuration file for Kafka, say kafka_config.py inside the feedback_realtime app:
KAFKA_BROKER_URL = 'aws.kafka.url:9092'  
FEEDBACK_TOPIC = 'user_feedbacks'

Kafka Producer Utility:

Create a utility function to produce messages to Kafka. Let’s create a file named kafka_producer.py inside the feedback_realtime app:

# feedback_realtime/kafka_producer.py

from confluent_kafka import Producer
from .kafka_config import KAFKA_BROKER_URL, FEEDBACK_TOPIC

def acked(err, msg):
    """Callback for delivery reports."""
    if err is not None:
        print(f"Failed to deliver message: {msg.value()}: {err.str()}")
    else:
        print(f"Message produced to {msg.topic()}")

def produce_feedback(feedback_data):
    p = Producer({'bootstrap.servers': KAFKA_BROKER_URL})
    p.produce(FEEDBACK_TOPIC, key=str(feedback_data["id"]), value=str(feedback_data), callback=acked)
    p.flush()  # Wait for any outstanding messages to be delivered and delivery reports to be received.

Modify Django View:

Modify your feedback_view in views.py to send feedback to Kafka .

from django.shortcuts import render, redirect
from .forms import FeedbackForm
from .kafka_producer import produce_feedback
import json

def feedback_view(request):
    if request.method == 'POST':
        form = FeedbackForm(request.POST)
        if form.is_valid():
            feedback_instance = form.save()
            # Convert feedback instance to JSON and send to Kafka
            feedback_data = {
                "id": feedback_instance.id,
                "content": feedback_instance.content
            }
            produce_feedback(feedback_data)
            return redirect('feedback_form')
    else:
        form = FeedbackForm()
    return render(request, 'feedback_realtime/feedback_form.html', {'form': form})

With these modifications, every time a user submits feedback via the form, the feedback gets saved to your database and also sent to your Kafka topic named user_feedbacks in real-time.

Integrating Kafka with Spark Streaming

  1. Ensure you have Spark and its dependencies correctly set up.
  2. Ensure you have the spark-streaming-kafka package available.

Steps to Stream from Kafka and Write to S3 using Spark Streaming:

  1. Spark Streaming Code:
  2. Here’s a basic Spark Streaming job that reads data from Kafka and writes it to S3. Save this as kafka_spark_stream_to_s3.py.
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_format

spark = SparkSession.builder.appName("Kafka-S3-user-feedback-Streaming").getOrCreate()

# Define the Kafka parameters, topic name, and the Kafka broker (assuming localhost here).
kafkaStream = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "KAFKA_URL:9092") \
    .option("subscribe", "user_feedbacks") \
    .load()

# Assuming the feedback messages are plain text; you can adjust for other formats.
rawValues = kafkaStream.selectExpr("CAST(value AS STRING)")

# Write to S3 in year/month/day format
query = rawValues.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3a://real-time-project-vengat/") \
    .option("checkpointLocation", "s3a://your_bucket_name/checkpoints/") \
    .partitionBy(date_format(current_date(), 'yyyy'), date_format(current_date(), 'MM'), date_format(current_date(), 'dd')) \
    .start()

query.awaitTermination()
  • Replace <spark_version> with your Spark version.
  • Replace your_bucket_name with your S3 bucket name.
  • To access S3, you’ll need to have the appropriate AWS credentials set up, either through a configuration file, environment variables, or Spark configurations.

Run the Spark Streaming Application

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.1 kafka_spark_stream_to_s3.py

Bonus: Triggering Kafka Producer in the Background Without Celery

While Celery is a prevalent choice for background tasks in Django, it might be overkill for specific use cases or perhaps doesn’t align with project requirements. For those situations, Python offers several ways to run background tasks. One such way is using Python threads.

Here’s how you can produce to Kafka in the background when a user submits feedback through Django, without the overhead of a task queue like Celery:

1. Update Django View with Python Threading:

Python’s standard library offers the threading module which allows us to run functions as background threads:

from django.shortcuts import render, redirect
from .forms import FeedbackForm
from .kafka_producer import produce_feedback
import json
import threading

def kafka_producer_background(feedback_data):
    produce_feedback(feedback_data)

def feedback_view(request):
    if request.method == 'POST':
        form = FeedbackForm(request.POST)
        if form.is_valid():
            feedback_instance = form.save()
            # Convert feedback instance to JSON and send to Kafka
            feedback_data = {
                "id": feedback_instance.id,
                "content": feedback_instance.content
            }

            # Trigger Kafka producer as a background task
            thread = threading.Thread(target=kafka_producer_background, args=(feedback_data,))
            thread.start()
            
            return redirect('feedback_form')
    else:
        form = FeedbackForm()
    return render(request, 'feedback_realtime/feedback_form.html', {'form': form})

2. Considerations:

  • Scalability: While threading is lightweight, remember that it’s not the most scalable solution for a vast number of simultaneous background tasks. If your application scales up in terms of user interactions, you might eventually need to consider process-based background workers or distributed task queues.
  • Error Handling: The above code is a simple demonstration. In a production scenario, you’ll want to implement error handling, especially for the Kafka producer part.
  • Database Connections: Be cautious when using threads with Django, especially regarding database connections. If the background task interacts with the database, you might run into issues if the main thread closes the database connection before the background thread finishes.

Conclusion

In this article, we explored a comprehensive real-time data processing pipeline, starting from a user interaction on a Django-based web application, through real-time message streaming using Kafka, to distributed data processing with Spark, and finally storing the processed data in the scalable cloud storage solution S3, structured by date.

  • Django: A robust Python web framework that allows us to quickly create web applications. In our case, we used it to gather user feedback in real-time.
  • Kafka: A distributed streaming platform suitable for building real-time data pipelines and streaming apps. It provided us with the capability to handle real-time user feedback without losing data and ensuring it’s processed in the order of arrival.
  • Spark Streaming: An extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. We integrated this to process data from Kafka and execute necessary transformations and actions.
  • Amazon S3: An object storage service from AWS, providing scalability, data availability, security, and performance. Here, the processed data from Spark was stored in an organized year/month/day format.

The integration of these technologies offers a scalable and robust solution for real-time data processing, suitable for various business needs — from simple web applications to complex ETL processes and analytics.

If you found this article insightful and wish to delve deeper into full-stack development or data engineering projects, I’d be thrilled to guide and collaborate further. Feel free to reach out through the mentioned channels below, and let’s make technology work for your unique needs.

Contact Channels:

Thank you for embarking on this journey with me through the realms of real-time data processing. Looking forward to our future collaborations.


0 reactions

Discussion


Looking for Freelancing Jobs
Joined on April 15, 2020

Latest Videos