Building Event-Driven Architectures with Kafka, Django and Microservices- Part 1.

Prince Igwenagha
11 min readAug 19, 2024

--

Photo by rivage on Unsplash

Events are occurrences or actions that have happened. These actions can trigger other actions, creating a chain of reactions within a system. In an event-driven architecture (EDA), these events serve as the primary means of communication between decoupled components, enabling systems to respond to changes and process data in real-time without being tightly coupled.

Prerequisites: Before delving into the rest of the article, knowledge of Django is required. Also, ensure you have the following tools installed, as they may be required if you plan to install Kafka on your local machine:

  • Docker
  • Docker Compose

It is required that data be transferred among software components for the whole application to operate effectively, regardless of the architecture with which the application was built. In a monolithic application, data is sent and received between exported function calls. ModuleA.function1 is imported and called in ModuleB.function2. Whatever data the function, function1 has access to, can be utilized in function2. There are few or no complications in this form of transfer because components are familiar with themselves, and they act together as though they are of a single unit.

In an application made up of decoupled units (microservices), these units have very little interaction with each other, almost like total strangers. However, these isolated environments manage to work effectively with data transferred among themselves.

Monolith and Microservices

One may ask how this is possible?

Event-driven architecture

To understand what event-driven architectures are, we must first have a basic understanding of the ways in which microservices communicate with each other. There are two major forms of communication in this scenario:

  • Synchronous communication
  • Asynchronous communication

With synchronous communication, a microservice sends an HTTP request to another service, asking for a piece of data through an API (Application Programming Interface). Operations and processes running in the requesting service are put on hold until a response is received from the other microservice.

With asynchronous communication, a microservice “dumps” a piece of data that it thinks will be needed by another microservice. Whether or not the latter receives the data, the former does not care, and life goes on.

Communication types across microservices

Event-driven architecture belongs to the category of asynchronous communication.

Event-driven architecture (EDA) is simply a design pattern that helps decoupled systems (microservices) communicate with each other with the help of triggered events. This architecture is made of 4 components:

  • Publisher: This component is the system that publishes events to the event broker. It sends these events, which are the results of actions that were carried out in its system.
  • Subscriber: Just as the name implies, this component subscribes to one or more “topics” in the event broker. By subscribing, it automatically listens and fetches events that were sent to these topics for processing. In this context, think of a topic as a folder that is used to contain related files (events).
  • Event: This is the result of an action containing data that was worked on by the publisher.
  • Event Broker: This is middleware software between publishers and subscribers. It is responsible for distributing events among decoupled systems. Popular event broker technologies include Google Cloud Pub/Sub, Azure Event Grid, RabbitMQ, and Apache Kafka.

Apache Kafka

Kafka is a streaming platform that’s used to collect, store, and process streams of data in real-time. Think of Kafka like a mail service. You prepare your letter and take it to the post office so that it will be processed and sent to your dear fellow. During this process, the recipient is unaware of what is going on at the moment and continues with his life, and suddenly he has a letter from you, his dear friend. The difference here is that you and your friend are familiar with each other; microservices using Kafka aren’t. In this scenario, we can all agree that the operations of the mail service revolve around people’s mail. In the same way, one of the most important components of Kafka is the message.

Messages are made up of components, including keys (not so important for your work) and values (responsible for holding your event data). These messages are sent by applications known as producers to a server known as a broker and kept in a type of database known as the commit log.

This broker keeps the operations of reading, writing, and storing records functional. The consumer, on the other end, automatically receives these messages from topics it subscribes to and acts on them according to its business logic. All the operations of Kafka are managed by Zookeeper, a software that maintains brokers and manages the configuration of topics and partitions.

With Kafka in the picture, EDAs are implemented, allowing application systems to share data effectively among themselves without interfering with each other’s running operations.

Given this understanding, how do we implement EDAs in our Python microservices?

Demo

Knowing that Kafka is an important unit, you can either configure your Kafka cluster and deploy it to a remote environment, or you can use managed Kafka instances from various cloud services for production, but these are outside the scope of this article.

Here, we will be configuring and running Zookeeper and our Kafka cluster locally with Docker Compose for a food supermarket enterprise.

If you need a step-by-step guide to setting them up, refer to this detailed Medium article.

You can find the GitHub repo for this demo here.

So a food supermarket company from some alternate universe came to us with a problem. The issue is that most times they run out of food on business days, leaving customers angry and unsatisfied, and that’s bad business.

The solution? We build software that takes customers’ orders and, at the same time, keeps track of the quantity of food items in store after they are ordered. In this scenario, we would have two microservices. One for taking orders, the other for inventory management, running with our local Kafka broker.

Demo folder structure

Installing Kafka does not come with a GUI, so most commands run only with the command line interface. For this demo, we have a Kafka setup in our compose.ymlfile:

version: ‘3.7'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 # makes Kafka accessible outside the container to clients on the Docker host
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka_cli:
image: confluentinc/cp-kafka:latest
container_name: kafka_cli
depends_on:
- kafka
entrypoint: ["/bin/sh", "-c"] This runs an executable script in the Kafka Docker container followed by the next set of arguments after it. In this case, kafka-topics is the script.
command: |
"
sleep 15
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic orders --replication-factor 1 --partitions 1 --config retention.ms=1800000
echo -e 'Topic created'
kafka-topics --bootstrap-server kafka:9092 --list
kafka-topics --describe --bootstrap-server kafka:9092 --topic orders

"

Here we have Confluent Docker images of Apache Zookeeper and Kafka. Without paying much attention to the zookeeper service, we see that kafka builds and runs right after the Zookeeper container is up and running. The kafka service connects to Zookeeper management through the zookeeper:2181 URL. This service exposes Kafka to clients both within the Docker network (kafka:9092) and on the host machine (localhost:29092).

For the kafka_cli service, we see that it is set to run right after the kafka container is ready. The sole purpose of this particular service is to run commands on the kafka environment. At the beginning, a delay of 15 seconds is set. This is to ensure that the Kafka broker is fully operational before we run any Kafka commands.

Next, we run a command that executes an executable script — kafka-topics.sh, for creating kafka topics. This script is executed with several arguments to create a topic named orders , only if that topic does not exist, with 1 partition, and a custom message retention period of 30 minutes (1,800,000 milliseconds) instead of the default 7-day retention period.

The next command simply gives a list of created topics, after which we can see the configurations for the orders topic. Running these services with docker compose up give the following results:

Docker logs

With a Kafka broker ready, we can now integrate it into our microservices. It is good to know that microservices are language-agnostic, meaning that a group of them can be built and function efficiently with several programming languages and frameworks, unlike a monolith application, where a single programming language/framework is used from start to finish. For the scope of this article, we will be building both microservices with Django.

In the orders-microservice folder, we already have our business logic, starting with the data models, down to the URL endpoints:

## orders/models.py

from django.db import models
from . import status


class FoodItem(models.Model):
name = models.CharField(max_length=50)
price = models.DecimalField(max_digits=10, decimal_places=2)

def __str__(self) -> str:
return self.name

class Order(models.Model):
customer_name = models.CharField(max_length=50)
status = models.CharField(max_length=20, choices=status.ORDER_STATUS, default="PLACED")
created = models.DateTimeField(auto_now_add=True)

def __str__(self) -> str:
return self.customer_name


class OrderItem(models.Model):
name = models.CharField(max_length=50)
quantity = models.PositiveIntegerField()
unit_price = models.DecimalField(max_digits=10, decimal_places=2)
price = models.DecimalField(max_digits=10, decimal_places=2)
order = models.ForeignKey(Order, on_delete=models.CASCADE)

def __str__(self) -> str:
return self.name
## orders/serializers.py

from rest_framework import serializers

class CartSerializer(serializers.Serializer):
quantity = serializers.IntegerField()
## orders/views.py

from django.shortcuts import render
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework import status
from .models import FoodItem, Order as OrderModel, OrderItem
from . serializers import CartSerializer
from django.http import Http404
from django.core.cache import cache


cart_owner = "prince_igwe_cart"

class CartItems(APIView):
def post(self, request, food_id):
try:
serializer = CartSerializer(data=request.data)
if serializer.is_valid():
item_quantity = request.data['quantity']
food_item = FoodItem.objects.get(id=food_id)

## setting new cart for Prince only if there's none existing
cache.add(key=cart_owner, value={"customer_name": "prince igwe", "items": [] }, timeout=1200)

## add food item to cart
my_cart_items = cache.get(cart_owner)['items']
cart_item = { "name": food_item.name, "price": int(food_item.price), "quantity": item_quantity }
my_cart_items.append(cart_item)
cache.set(key=cart_owner, value={ "customer_name": "prince igwe", "items": my_cart_items })

updated_cart = cache.get(key=cart_owner)
print(updated_cart)

return Response({"message": "Added to cart"}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

except FoodItem.DoesNotExist:
raise Http404


class Order(APIView):
# place an order
def post(self, request):
my_cart = cache.get(key=cart_owner)
order = OrderModel.objects.create(customer_name=my_cart['customer_name'])
order.save()

my_cart_items = cache.get(cart_owner)['items']
for item in my_cart_items:
item_name = item['name']
item_quantity = item['quantity']
item_unit_price = item['price']
price = item_unit_price * item_quantity

order_item = OrderItem.objects.create(name=item_name, quantity=item_quantity, unit_price=item_unit_price, price=price, order=order)
order_item.save()

cache.delete(cart_owner)

return Response({"message": "Order has been placed for delivery."}, status=status.HTTP_200_OK)
## orders/urls.py

from django.urls import path
from .views import CartItems, Order

urlpatterns = [
path("cart/<int:food_id>", CartItems.as_view()),
path("", Order.as_view())
]

From the first set of code, we have models that work hand-in-hand to keep up with the details of the customers’ needs. FoodItem hold the data on the delicious foodstuffs displayed on the company’s e-commerce website—their names and prices. The Order model represents the customers’ orders, holding details of the customers’ names, the order's status, and the time the order was placed. The OrderItem holds data of food items that are related to the order that was placed.

In the views.py file, we have simple logic for our cart and order features. Here we see aPOST action that will be responsible for adding food items to a cart cache for a customer named prince igwe. This customer can specify the quantity for each food item of his choice, with the CartSerializer . After prince is satisfied with his choices, he can place an order for those items, which clears his cart later on.

In the urls.py file, we see endpoints are attached to our logic in the previous file.

After running the server for this API, on Django’s admin page, we see that there are predefined foodstuffs for customers, along with their prices.

Food Items on Django Admin page

Testing our endpoints, we see that items are added to cart, orders placed and data is stored.

Add item to cart
Place an order
Orders
Ordered Items

With almost half of the problem solved, we will integrate Kafka into this API. The orders microservice will be working as a producer, publishing events to the Kafka broker.

Configuring a producer requires several properties, of which three are most popularly known:

  • bootstrap servers: This argument holds the values of brokers’ connection strings, connecting the producer to the Kafka cluster.
  • key serializer: This is the serializer that serializes an optionally specified key to a byte array for publishing messages. This key is responsible for determining what partition the message will be going to, in the topic. If no key is provided by us, then messages will be evenly distributed among all partitions in the topic.
  • value serializer: This serializer serializes our message (event data) into a byte array for transactions over the network.

Out of these arguments, the value(s) for the bootstrap servers must be provided.

With this basic knowledge, let’s set up our producer. We will be installing a Kafka client for Python, called kafka-python .

pip install kafka-python

After installation, we can create a utilities folder where we can setup the Kafka configurations for the producer.

## utils/kafka/producer.py

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers="localhost:29092",
value_serializer=lambda m: json.dumps(m).encode('ascii'),
api_version=(2, 0, 0)
)

topic = "orders"

def update_inventory(ordered_items):
future = producer.send(topic, ordered_items)
try:
metadata = future.get()
print(metadata)
except KafkaError as e:
print(e)

This snippet of code sets up a producer client that connects to our Kafka broker in our compose.yml file, with the URL “localhost:29092”. The value_serializer converts our message value to a JSON format, before serializing it to a byte array. It api_version selects the Kafka version we want to use. In this case, 2.0.0. The default value is 0.10.2.

We define a function which publishes the details of the ordered items to our orders topic in a synchronous manner. This means that the producer publishes our message to the broker and waits for a response object containing details of our successfully published message, which is called the RecordMetaData. If an error is encountered, it is printed as well.

In Conclusion…

In this article, we’ve explored how to set up a Kafka producer in a Django microservice to publish events. By configuring Kafka to act as a communication bridge, we’ve taken the first step toward building an event-driven architecture that allows for scalable and decoupled services.

But publishing events is only half of the equation. In the next part of this series, we’ll dive into how another microservice can consume these events and take action based on them. We’ll cover how to set up a Kafka consumer in Django, handle event processing, and ensure that our inventory management microservice stays in sync with the orders placed by customers.

Stay tuned for Part 2, where we’ll continue building out this real-time, event-driven system with Apache Kafka.

--

--

Prince Igwenagha
Prince Igwenagha

Written by Prince Igwenagha

Backend Engineer - bringing APIs to life. 🧬

No responses yet