Building Event-Driven Architectures with Kafka, Django and Microservices-Part 2.
In Part 1 of this series, we explored the role of Kafka producers in a Django-based microservices environment. We discussed how producers publish messages to Kafka topics, laying the groundwork for efficient event-driven communication between services.
Part 2 shifts our focus to Kafka consumers, the components responsible for receiving and processing these messages. As it was mentioned earlier, a consumer automatically receives these messages from the topic it subscribes to and acts on them according to its business logic. It is good to note that while one consumer subscribes to a topic, more consumers can be created to subscribe to the same topic to process event data with the same business logic. These consumers, acting as one, are known as a consumer group. To add to this, one consumer running alone can still be identified as a member of a consumer group.
Question: Why do we need a consumer group?
It is pretty much okay to have one consumer listen to and process events from a topic. However, having a consumer group helps balance the workload of receiving and processing events among consumers, ensuring that no single consumer is overwhelmed with the rate at which events are published.
A consumer group can be identified by a group ID. With this ID, we can know what group a consumer belongs to. During the course of the demo, we will see the importance of this ID.
Currently, not much work is done on our inventory microservice, as there’s very little user interaction here. However, a predefined inventory of food items is available. These are the same items with the same names as the ordered items from the previous microservice.
How we configure the consumer depends on the importance of the message that it will be receiving. As an enterprise, every published message is very important. We want equality between the inventory table’squantity
data and the physical quantity kept in actual storage; in other words, zero loss of event data is our top priority.
In a scenario where our inventory microservice is down but our orders service is still publishing messages, this would mean some messages would not be processed accordingly to update the food item inventory quantity data, even when the inventory microservice is restored. To resolve this issue, offsets, commits, and the consumer group ID all play important roles.
Think of offsets as page numbers in a book. Each number is attached to the writing on that page. That way, we can make quick reference to a piece of information without going through the whole book all over again. In this context, offsets are the page numbers for published messages to a topic.
Commits are simply the process of the consumer keeping track of offsets. Let’s assume you were reading a novel, and for some reason, you stopped reading it for a while. By now, you will probably forget the last page you read, and when you get back to the book, you will most likely have a hard time finding where you left off. This is because you didn’t keep track of the page number. Keeping track may mean that you make a mark on the page or fold the page in some way, even though some people may consider this to be bad behaviour. Committing offsets ensures that when a consumer resumes reading (or consuming messages), it can pick up exactly where it left off without missing or repeating any data.
Configuring a consumer requires several arguments:
- bootstrap servers: This argument holds the values of brokers’ connection strings, connecting the consumer to the Kafka cluster.
- key deserializer and value deserializer: These work in a manner opposite to key and value serializers. They take in byte arrays and convert them back to data structures.
- enable-auto-commit: We can manually commit an offset for a consumer, but with this argument set to
true
by default, the consumer’s offset is automatically committed at periodic intervals. - group id: This is an optional property used to identify the consumer group that the consumer belongs to. If no group ID is given, offset commits are disabled for the consumer. In our case, we want this present.
- auto-commit-interval-ms: This is simply the number of milliseconds with which the consumer should periodically commit offsets. By default, this is set to 5000 ms, equivalent to 5 seconds. This can be set to a lower value, thereby committing offsets at a faster rate, resulting in a reduction in the loss of published messages.
With our understanding of these arguments, we can now set up our consumer for our inventory microservice. We also install kafka-python
:
pip install kafka-python
After this step, we set up things a little bit differently. Unlike the producer, which only publishes messages to the broker when the need arises, the consumer cannot operate in such manner. It must always listen for new messages, whether or not a new message is published. At the same time, it must continue with its existing processes. So this is like a task ratio of 1:2 between the producer and the consumer, respectively.
Knowing this, how do we ensure that the inventory service runs as a consumer while executing other processes? Although Django is a single-threaded framework by default, it is capable of multi-threading. This means that while it was originally designed to execute tasks in a single sequence, Python’s multi-threading capability enables it to execute multiple parts of a program at the same time. In other words, we can set up a consumer to run on a separate thread from the framework’s main thread.
You may wonder why a separate thread should be set up for the consumer. This is because running a consumer on the main thread blocks the process of running Django’s server. Once the consumer starts listening for messages, it will prevent the server from executing any other instructions, effectively halting the entire application. By running the consumer on a separate thread, we ensure that the server remains responsive and can continue to handle requests, while the consumer processes messages in the background without interference.
Configuring the consumer:
## utils/kafka/consumer.py
from kafka import KafkaConsumer
import json
from utils.update_inventory import update_inventory
topics = ['orders']
consumer = KafkaConsumer(bootstrap_servers="localhost:29092",
value_deserializer=lambda m: json.loads(m.decode('ascii')),
group_id="inventory_relation",
auto_commit_interval_ms=1000,
api_version=(2, 0, 0)
)
consumer.subscribe(topics)
while True:
all_records = consumer.poll(timeout_ms=100, max_records=100)
for topic_partition, messages in all_records.items():
if topic_partition.topic == "orders":
update_inventory(messages)
From the beginning, we see a list of Kafka topics that the consumer will subscribe to. For our demo, this list only contains the orders
topic. Our consumer is configured to connect to our local Kafka broker with the bootstrap_servers
argument. With the value_deserializer
, our message values will be deserialized from byte arrays to Python objects. The consumer belongs to the consumer group having the group_id
of inventory_relation
. We then set the periodic auto commit interval time of the consumer to 1000 ms (1 second), and we make it subscribe to the Kafka topics.
While our consumer infinitely loops through subscribed topics, it polls each topic with a polling timeout of 100 ms (0.1 second), fetching a maximum of 100 records for every poll on that topic. The records of these topics are packaged into a dictionary and returned for processing. This dictionary has Kafka’s topic partitions for keys, and a list of messages for value for each topic partition.
For our business logic, we call a function update_inventory
on every message that belongs to the orders
topic. We see how this function is defined below:
## utils/update_inventory.py
from inventories.models import FoodItemInventory
def update_inventory(messages):
for message in messages:
ordered_items = message.value
for item in ordered_items:
food_item = FoodItemInventory.objects.get(name=item['name'])
current_food_item_quantity = food_item.quantity
new_food_item_quantity = current_food_item_quantity - item['quantity']
food_item.quantity = new_food_item_quantity
food_item.save()
print(f" Quantity of {food_item.name} has now reduced from {current_food_item_quantity} to {food_item.quantity}. ")
We can see that this function acts on a list of messages, and for each message, it acts on a list of items belonging to that message. For each item, a FoodItemInventory
record having the same name as the item’s name is called. The quantity value of this record is reduced by the quantity of the food item that was ordered from the orders microservice. This inventory record is then updated and saved, and a message is printed, showing the updated food item quantity.
With our consumer configured and business logic applied, this is just half of what we should do for our consumer. The next task is to add new configurations to keep our consumer running while the Django server is also running. We can accomplish this with Python’s Threading
module- a module that helps spin up additional threads in a program, and Django’s custom command management, which will help in creating a command that activates our thread.
Defining a thread for our Kafka consumer:
## threads/kafka_consumer_thread.py
from threading import Thread
from utils.kafka.consumer import consume_kafka_messages
class ConsumeKafkaMessageThread(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
print("Kafka consumer thread running")
consume_kafka_messages()
In the above snippet, we import the Thread class from the threading module. We then import our consume_kafka_messages
function from our kafka.consumer.py
file. A ConsumeKafkaMessageThread
class is defined as having the Thread
class as a super class. We then execute our function for consuming Kafka messages with the run
method. Going back to our consumer.py
file, we make changes to it in order to match the requirements for the thread class.
## utils/kafka/consumer.py
# modified
from kafka import KafkaConsumer
import json
from utils.update_inventory import update_inventory
def consume_kafka_messages():
topics = ['orders']
consumer = KafkaConsumer(bootstrap_servers="localhost:29092",
value_deserializer=lambda m: json.loads(m.decode('ascii')),
group_id="inventory_relation",
auto_commit_interval_ms=1000,
api_version=(2, 0, 0)
)
consumer.subscribe(topics)
while True:
all_records = consumer.poll(timeout_ms=100, max_records=100)
for topic_partition, messages in all_records.items():
if topic_partition.topic == "orders":
update_inventory(messages)
Now that we have defined our consumer thread, let’s define a command for activating it. We want our consumer thread to start running as soon as the command, python3 manage.py runserver
is executed. In our Django inventories
app, create a folder structure of management/commands/
where management
is the parent folder and commands
is the child. In the commands
folder, create two files, __init__.py
and kafka_consumer.py
.
In the kafka_consumer.py
file, we have the code:
from typing import Any
from django.core.management.base import BaseCommand
from threads import kafka_consumer_thread
class Command(BaseCommand):
help = "Start Kafka consumer"
def handle(self, *args: Any, **options: Any) -> str | None:
kafka_consumer = kafka_consumer_thread.ConsumeKafkaMessageThread()
kafka_consumer.start()
This creates a command that runs our ConsumerKafkaMessageThread
class. This command has a description saying, “Start Kafka consumer”. To run the command automatically on the server’s start up, we make one little change to our inventories
app configuration.
## inventories/apps.py
from django.apps import AppConfig
import os
from django.core.management import call_command
class InventoriesConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'inventories'
def ready(self) -> None:
if os.environ.get('RUN_MAIN'):
print("hello kafka...")
call_command('kafka_consumer')
This snippet has the ready
method defined. With this method, when the application is ready, Django checks if it is running on the main thread. If that happens to be true, the kafka_consumer
command is called. This is equivalent to writing python3 manage.py kafka_consumer
on the terminal.
Running our microservices on different terminals:
To test our services, we run our Kafka service with docker compose up
, and order food items. Let’s test with three items.
You can see that before we place an order, we have 300 chicken wings, 50 apples, and 683 bananas.
Now, we add 6 chicken wings to the cart:
Add 4 apples:
And 10 bananas:
We then place an order:
Our ordered items are published as a message to the Kafka broker. We should see that our inventory is updated accordingly:
To test how effective our consumer’s configuration is during downtime, stop the inventory microservice for some seconds, place an order for 6 apples, and spin back up the microservice after a few seconds.
Run the microservice again, and you should see the updated value. Apples are now down to 40.
In Conclusion…
In this article, we delved into the crucial role of Kafka consumers in ensuring reliable event-driven communication within a Django-based microservices architecture. We explored how consumers receive and process messages from Kafka topics, and the importance of offset management to ensure data consistency even in the face of service downtime.
We also walked through the practical steps of configuring a Kafka consumer in Django, including threading considerations to maintain application responsiveness. By implementing these practices, you can enhance the reliability of your microservices, ensuring that your system can handle events with minimal risk of data loss.
Till we meet again, dear reader, keep building...