Distributed task 101: update Celery task for client progressively
Hello,
Firstly, please allow me to introduce myself a bit.
I’m Technical Manager working on a Game Studio in VietNam for 10 years. I’m passionate about working in the 3D industry.
Ok, without further ado. Let’s get straight to the point.
This is the first time starting to write something regarding technology that I researched for my studio project. In my studio, our users have to upload their assets to the server to run time-consuming, complex-computing tasks handled by a high-performance computer. That’s a portion of our back-end. So for running a system like that, we have to utilize some technologies:
- Celery: distributed task queue in Python
- Redis: in-memory cached database.
- RabbitMQ: open-source message broker.
- FastAPI: web framework support ASGI in Python
I won’t dive into that frameworks deeply, because you definitely find out a ton of tutorials teaching about them in extreme detail. I’m hereby talking about how to connect them, let them support each other to make our system more robust.
In the beginning, it started with quite a simple requirement. Ok, our artists just need to know when a task that they submit to the server complete successfully. It’s quite easy, isn’t it?
I would say “yes”. Celery is the guy that we’re looking for.
But the story never has an ending such as that. Always more complicated, at least when we build up a web application for monitoring tasks, user manipulate on that. We need to find a way to monitoring tasks in real-time, watching the task’s progress in real-time.
Then we came up with an idea was combine Pub/Sub Redis with WebSocket. Here is break-down
Celery
A most widely used distributed task queue in Python.
Firstly, we need to set a task library up. We just wrap up our function with @task decorator. At every step in our task, we publish a message to Redis about the current status of the task with channel is a combination of session-ID (JWT — JSON web token) and task-ID (celery task ID) to classify which task will go along with which WebSocket connection.
With app is defined:
Here we use a Config object for the celery instance, which is instanced from the Config class.
One last thing but not least is how to run a celery worker. It’s pretty simple:
celery -A services.queue.tasks worker --pool=eventlet --concurrency=4 --loglevel=info
Mostly, Celery setup completely.
FastAPI
So now we will turn into FastAPI, whenever the server receives an HTTP post from any client then trigger a celery task with token (JWT) accompany with
A WebSocket will be connected and ready to send back the message from task.
It’s worthy to note that we use aioredis instead of redis normally, because SUB will block the synchronous process, so we use aioredis support asyncio to keep the server listen if another user connects to the server.
Final works
With the introduction about how we set things up, our system can be running tasks in the background and be keeping users updated about the task’s status.
Welcome any feedback and discussion to expand our knowledge.
Thank you for reading