Fostering Real-Time Interactivity: A Guide to GraphQL Subscriptions in Django for Web APIs.
Most modern applications have one or two features that promote real-time interactivity with the user. These interactions can have high and low priorities. High-priority real-time interactions require time-sensitive actions. An example of an application that falls into this category is our messaging app. We want to receive a quick response after sending our message. On the other hand, low-priority real-time interactions may involve less critical updates or notifications that enhance the user experience but may not require immediate attention. An example is timely game updates as notifications on our sports apps.
These interactions can be implemented with asynchronous communication among application components with the help of events or with popular communication protocols like WebSockets and MQTT. Since we are talking about GraphQL, subscriptions are suitable for incorporating real-time interactivity into applications.
In my previous article, I introduced GraphQL, covering its fundamental operations, mutation and query with demonstrations.
In the article, we also see that there was a brief explanation of subscriptions.
For the subscription operation, think of it as the WebSocket connection for GraphQL APIs. While the query operation receives an API response after a single request, the subscription reads data continuously in real-time.
This is because the operation actually works with a WebSocket connection, which helps push updates to the client application listening for real-time data.
Assuming we were building a feature where we wanted the user to receive goal updates for an ongoing football match, on the server-side, we would have to define a Goal
schema that would contain the necessary data about a scored goal, and then we would create a resolver function, let’s say,goalScored
that would work with the Subscription
schema type to publish in real time.
type Goal {
team: String!
scorer: String!
minute: Int!
}
type Subscription {
goalScored: Goal
}
On the client side of our application, listening for real-time goal updates would be done by sending the query:
subscription {
goalScored {
team
scorer
minute
}
}
This would give the response:
{
"data": {
"goalScored": {
"team": "Barcelona FC",
"scorer": "Lionel Messi",
"minute": 45
}
}
}
Once the first subscription query has been sent out, the client app doesn’t need to send another request to see if another goal was scored; it just updates in real-time.
With enough introduction, implementing subscriptions in your Django API requires its server to run asynchronously. Knowing that Django operates on a synchronous server by default, you would have to make your server operate asynchronously with a Python ASGI server like Daphne.
Demo…
In the demo from the previous article, we built a blog-author GraphQL API, where we created Author
‘accounts’ and published blogs. Let’s assume that a reader has subscribed to get their authors’ latest articles on our API. How do we implement a real-time notification feature where this reader gets these updates? Come with me.
Django Channels is a Django library that helps you write code efficiently on an ASGI server. While the default Django package enables you to write code for HTTP connections, Django Channels enables you to write code beyond the HTTP request-response cycle.
To get started, we will have to install Django Channels with the optional Daphne add-on to install both Channels and Daphne at once.
pip install channels["daphne"]
Place the Daphne package at the beginning of your INSTALLED_APP
settings to enable the ASGI server to run on Django’s runserver
command.
INSTALLED_APPS = [
'daphne', ## here
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
# 3rd party apps
"ariadne_django",
# local apps
"blog.apps.BlogConfig",
]
Edit the asgi.py
file in your core
folder to look like this:
import os
from django.core.asgi import get_asgi_application
from ariadne.asgi import GraphQL
from channels.routing import URLRouter
from .schema import schema
from django.urls import path, re_path
from ariadne.asgi.handlers import GraphQLTransportWSHandler
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
# application = get_asgi_application()
application = URLRouter([
path("graphql/", GraphQL(schema=schema, websocket_handler=GraphQLTransportWSHandler())),
re_path(r"", get_asgi_application())
])
From the above code, we set our GraphQL API to run on the ASGI server with Ariadne’s ASGI GraphQL class. By specifying websocket_handler=GraphQLTransportWSHandler
, we choose to use the sub-protocol (graphql-transport-ws) of the GraphQL-over-Websocket protocol (graphql-ws) to handle subscription
WebSocket connections in GraphQL queries. Another subscription protocol for GraphQL WebSocket connections is the default subscriptions-transport-ws
written by Apollo, but this is no longer maintained.
Edit the settings.py
file to register our ASGI application as the root application.
# WSGI_APPLICATION = 'core.wsgi.application'
ASGI_APPLICATION = "core.asgi.application"
Define the subscription
schema type for our subscription operation.
type Blog{
pk: Int
title: String
content: String
author: Author
}
type Subscription {
getLatestBlog: Blog
}
Now, because we are operating on an ASGI server with Channels, we would have to make changes to how we interact with Django’s synchronous ORM-in an asynchronous manner. This can be done with the help of Channels’ database_sync_to_async
method.
Edit our resolvers.py
to include:
from channels.db import database_sync_to_async
import asyncio
async def latest_article_generator(*_):
while True:
await asyncio.sleep(1)
# wrapping Django ORM synchronous operation (Blog.objects.latest("title")) with
# database_sync_to_async method to be executed in an asynchronous environment
blog = await database_sync_to_async(Blog.objects.latest)("title")
# applying the same database_sync_to_async method to get author's data
author = await database_sync_to_async(lambda: blog.author)()
author_details = {
"username": author.username,
"age": author.age
}
response = blog.__dict__
response['author'] = author_details
yield response
async def resolve_get_latest_blog(response, obj):
return response
In our schema.py
, edit it to include:
from ariadne import QueryType, make_executable_schema, load_schema_from_path, MutationType, SubscriptionType
subscription = SubscriptionType()
subscription.set_source('getLatestBlog', blog_resolvers.latest_article_generator)
subscription.set_field('getLatestBlog', blog_resolvers.resolve_get_latest_blog)
schema = make_executable_schema(type_defs, mutation, query, subscription)
From the above code snippets, we wrote an asynchronous generator function, latest_article_generator
, that would be responsible for fetching the latest blog article from the database at a one-second interval. Once gotten, the data would be passed to any function that would need it, with the help of the yield
keyword. The data is sent to the resolver function, resolve_get_latest_blog
, were the response
parameter is the yielded data to be delivered to the API client.
Of course, this is done with the set_source()
method that sets the data source of the subcription operation, and set_field()
method that sets the resolver function of the getLatestBlog
field of the Subscription
schema.
Start the Django server: python3 manage.py runserver
, you should now see that you are running on the Daphne ASGI server.
On your browser, go to the graphql/
endpoint and send the query:
subscription LatestBlog{
getLatestBlog{
title
content
author{
username
age
}
}
}
You should get a response looking like this:
The stop button you see there tells you that you'll continuously get the latest blog response until you click it.
To test this out, we will make a little change to the resolver responsible for creating blogs, since we are now working in an asynchronous environment.
@database_sync_to_async # new
def resolve_post_blog(*_, input:dict):
try:
author = Author.objects.get(username=input['authorUsername'])
blog = Blog.objects.create(title=input['title'], content=input['content'], author=author)
blog.save()
return{
"message": "Blog posted",
"blog": blog
}
except Author.DoesNotExist:
raise Exception("Author does not exist")
Send the data query, for our “third” blog article:
mutation{
postBlog(input:{
authorUsername: "TheAlchemist",
title: "My third blog"
content: "This is my third blog, man"
}){
message
blog{
pk
title
content
author{
username
}
}
}
}
And you should automatically see that our subscription fetches the latest blog.
In conclusion…
Subscriptions in GraphQL provide a valuable mechanism for real-time interactivity, and with the right tools and techniques, you can leverage this feature to create dynamic and engaging applications.
So whenever you see those Twitter posts’ like count increasing automatically, just keep in mind that it could be the work of a GraphQL subscription.