FastAPI With Celery: A Beginner's Guide
FastAPI with Celery: A Beginner’s Guide
Hey guys! Ever wondered how to make your FastAPI applications supercharged by handling tasks in the background? Well, that’s where Celery comes in! This guide will walk you through setting up Celery with FastAPI , enabling you to manage asynchronous tasks and keep your app responsive. We’ll dive into the nitty-gritty, from configuration to running workers, and even touch upon scheduling tasks. Let’s get started!
Table of Contents
- Understanding the Power of Celery and FastAPI
- Setting Up Your Development Environment
- Configuring Celery for FastAPI
- Defining and Running Celery Tasks
- Integrating Celery Tasks with FastAPI
- Monitoring and Managing Celery Tasks
- Flower
- Scheduling Tasks with Celery Beat
- Handling Task Results
- Error Handling and Best Practices
- Error Handling
- Best Practices
- Conclusion
Understanding the Power of Celery and FastAPI
So, what’s the deal with Celery and FastAPI ? Simply put, Celery is a powerful distributed task queue system that allows you to execute tasks asynchronously. This means your FastAPI app can offload time-consuming operations to a separate worker process, preventing them from blocking your main application thread. This keeps your app snappy and responsive, especially crucial for handling user requests efficiently.
Think of it this way: imagine you have a task that takes a while, like sending out a bunch of emails or processing large files. If you do this directly in your FastAPI app, your users will have to wait until the task is complete before they get a response. That’s a no-go! With Celery , you can delegate these tasks to a background worker. Your FastAPI app immediately sends a response to the user, and the worker handles the task in the background. It’s like magic, right?
FastAPI , on the other hand, is a modern, fast (high-performance), web framework for building APIs with Python. Its asynchronous capabilities make it a perfect match for Celery . They work together seamlessly to create robust and scalable applications. Using Celery with FastAPI lets you build highly performant apps that can handle a lot of traffic without slowing down. It’s the perfect combo for building stuff that can handle a ton of load! This pairing is super popular because it’s both fast to develop with and can handle a massive load. This is a game changer for building responsive and scalable web applications.
Setting Up Your Development Environment
Alright, let’s get our hands dirty and set up our development environment. First, you’ll need Python installed. Make sure you have Python 3.7 or higher. Then, we’ll create a virtual environment to keep our project dependencies isolated. This is always a good practice, guys!
# Create a virtual environment
python -m venv .venv
# Activate the virtual environment (Linux/macOS)
source .venv/bin/activate
# Activate the virtual environment (Windows)
.venv\Scripts\activate
Next, we’ll install the necessary packages using
pip
. You’ll need
FastAPI
,
Uvicorn
(an ASGI server),
Celery
, and a message broker. We’ll use Redis for this example, because it’s super easy to set up. If you don’t have Redis installed already, then you’ll need to install it. Check out the Redis website.
# Install the required packages
pip install fastapi uvicorn celery redis
Now, let’s make sure Redis is up and running. You can run Redis locally using Docker, or install it directly on your system. If you use Docker, make sure you have it installed before continuing. If you want to install Redis on your system, please find instructions for your operating system.
With our environment set up, let’s move on to the code!
Configuring Celery for FastAPI
Now, for the core stuff: configuring
Celery
to work with
FastAPI
. We’ll start by creating a
celery_app.py
file, which will contain our
Celery
application instance and related configurations. This file will be the heart of our
Celery
setup.
# celery_app.py
from celery import Celery
# Configure Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Replace with your Redis URL if different
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Replace with your Redis URL if different
# Create a Celery instance
celery_app = Celery(
'fastapi_celery',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND,
include=['app.tasks'] # Import your tasks
)
# Optional: Configure Celery to use JSON serialization
celery_app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='UTC',
CELERY_ENABLE_UTC=True,
)
In the code above, we define the
CELERY_BROKER_URL
and
CELERY_RESULT_BACKEND
. We’re using Redis for both the broker and the result backend. The broker is where
Celery
sends the tasks, and the result backend is where
Celery
stores the results of completed tasks. Make sure your Redis server is running before you run your
Celery
worker. The
include=['app.tasks']
line tells
Celery
to look for tasks in the
app.tasks
module. We’ve also included some optional configurations for JSON serialization, which is a good practice for compatibility. If you’re new to this stuff, just copy this code and follow along. This setup gives us the basics. With this setup,
Celery
and
FastAPI
will communicate smoothly.
Defining and Running Celery Tasks
Now, let’s get to the fun part: defining and running
Celery
tasks! Create a new file called
app/tasks.py
. This file will hold all of our task definitions. This is where we define the tasks we want
Celery
to execute asynchronously.
# app/tasks.py
from celery_app import celery_app
import time
@celery_app.task
def long_running_task(seconds: int):
print(f"Starting long-running task for {seconds} seconds...")
time.sleep(seconds)
print("Task completed!")
return f"Task finished after {seconds} seconds"
In this example, we define a simple task called
long_running_task
. This task takes an integer
seconds
as an argument and simulates a long-running operation by sleeping for that many seconds. The
@celery_app.task
decorator turns the function into a
Celery
task. Decorators are a really handy way of adding functionality to a function. Inside the task, we print some messages to the console to show that it is running. After the task completes, it returns a message. You can define any number of tasks in this file, each decorated with
@celery_app.task
.
Now, we need to run a Celery worker. The worker is the process that actually executes the tasks. Open a new terminal and navigate to your project directory. Then, run the following command to start the worker:
celery -A celery_app worker -l info
Here, the
-A celery_app
flag specifies the
Celery
app instance we created earlier, and the
-l info
flag sets the log level to
info
. You should see the worker connecting to your Redis broker and waiting for tasks. Now your worker is running! Any tasks sent to the broker will be picked up by your worker and executed. You’re ready to start using
Celery
with
FastAPI
!
Integrating Celery Tasks with FastAPI
Let’s integrate our
Celery
tasks with our
FastAPI
application. Create a file called
main.py
in the root of your project directory. This is where we’ll define our API endpoints and trigger our
Celery
tasks. This is where the magic happens and where our
FastAPI
app interacts with the
Celery
tasks we defined earlier.
# main.py
from fastapi import FastAPI
from celery_app import celery_app
from app.tasks import long_running_task
app = FastAPI()
@app.get("/run-task/{seconds}")
def run_task(seconds: int):
task = long_running_task.delay(seconds) # Trigger the Celery task
return {"message": f"Task triggered. Task ID: {task.id}"}
In this code, we create a
FastAPI
app and define a single endpoint,
/run-task/{seconds}
. When this endpoint is called, it triggers the
long_running_task
we defined earlier using the
.delay()
method. The
.delay()
method sends the task to the
Celery
queue. It doesn’t block the main thread. Instead, it immediately returns a response, along with the task ID. This means the user gets an immediate response, and the task runs in the background. It’s an awesome example of how to use
Celery
to make your API super responsive.
To run the FastAPI app, use Uvicorn :
uvicorn main:app --reload
Now, open your web browser or use a tool like
curl
to call the
/run-task/{seconds}
endpoint. Replace
{seconds}
with the number of seconds you want the task to run. For example:
curl http://127.0.0.1:8000/run-task/5
You’ll get a response immediately, with a task ID. Meanwhile, the
Celery
worker will start executing the
long_running_task
in the background.
Monitoring and Managing Celery Tasks
So, how do you keep tabs on your tasks? Celery provides a couple of ways to monitor and manage tasks. For basic monitoring, you can check the logs of your Celery worker. The logs will show you when tasks are received, started, and completed. But for more advanced monitoring, you’ll want to use a Celery monitoring tool.
Flower
One popular option is Flower , a web-based monitoring tool for Celery . You can install it using pip:
pip install flower
Then, run Flower in a separate terminal:
celery -A celery_app flower
This will start the Flower web interface. Open your browser and go to
http://localhost:5555
to see the dashboard. Flower lets you view task details, monitor worker activity, and even manage tasks, like retrying or revoking them.
Flower is super handy! It gives you real-time information about your tasks. From the Flower interface, you can see all sorts of details, like the task’s status (pending, running, success, or failure), the arguments it was called with, and even the results of the task.
Scheduling Tasks with Celery Beat
Want to schedule tasks to run at certain times? Celery Beat is the answer! Celery Beat is the Celery scheduler. It periodically sends tasks to the broker. Let’s see how to set it up.
First, configure your
Celery
app to use a scheduler. In your
celery_app.py
, you can add a
beat_schedule
configuration. This is where you define the tasks you want to schedule and when they should run.
# celery_app.py
from celery import Celery
# Configure Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# Create a Celery instance
celery_app = Celery(
'fastapi_celery',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND,
include=['app.tasks'] # Import your tasks
)
# Configure Celery Beat
celery_app.conf.beat_schedule = {
'add-every-minute': {
'task': 'app.tasks.long_running_task',
'schedule': 60.0, # Run every 60 seconds (1 minute)
'args': (10,)
}
}
celery_app.conf.timezone = 'UTC'
In this example, we configure
Celery Beat
to run the
long_running_task
every minute. We define the task, the schedule (in seconds), and the arguments to pass to the task. Remember that this example is just for demonstration purposes.
Celery Beat
can do a lot more complex scheduling. This configuration tells
Celery
to run our
long_running_task
every minute. This is great for regularly scheduled jobs.
To run Celery Beat , you’ll need to run the Celery worker and Celery Beat in separate terminals.
# In one terminal: Run the Celery worker
celery -A celery_app worker -l info
# In another terminal: Run Celery Beat
celery -A celery_app beat -l info
Now,
Celery Beat
will periodically send the
long_running_task
to the worker according to the schedule you defined. Make sure that your worker is up and running. The
Celery Beat
process will handle scheduling your tasks, and the worker will execute them. Make sure that the worker and beat are running in different terminals, otherwise you could see some errors.
Handling Task Results
Sometimes, you’ll need to get the results of your Celery tasks. Here’s how you can retrieve task results.
When you call
long_running_task.delay(seconds)
, it returns a
AsyncResult
object. This object has methods to get the status and the result of the task.
# main.py
from fastapi import FastAPI
from celery_app import celery_app
from app.tasks import long_running_task
from celery.result import AsyncResult
app = FastAPI()
@app.get("/run-task/{seconds}")
def run_task(seconds: int):
task = long_running_task.delay(seconds) # Trigger the Celery task
return {"message": f"Task triggered. Task ID: {task.id}"}
@app.get("/task-status/{task_id}")
def task_status(task_id: str):
result = AsyncResult(task_id)
return {"task_id": task_id, "task_status": result.status, "task_result": result.result}
In the
task_status
endpoint, we use the
AsyncResult
class to get the status and result of a task. The
result.status
attribute tells you the status of the task (e.g.,
PENDING
,
STARTED
,
SUCCESS
,
FAILURE
). The
result.result
attribute gives you the result of the task if it has completed successfully, or the error message if it failed. You can retrieve the task results from other endpoints. This is how you can use
Celery
in
FastAPI
to work with and access the results of tasks.
Error Handling and Best Practices
Let’s talk about error handling and some best practices for using Celery and FastAPI . Good error handling is critical for any production application.
Error Handling
Implement proper error handling in your tasks. Wrap your task logic in
try...except
blocks to catch and handle exceptions. Log errors using a logging library. Celery offers retry capabilities, so you can retry tasks that fail due to transient errors.
# app/tasks.py
from celery_app import celery_app
import time
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, retry_backoff=True, max_retries=3)
def long_running_task(self, seconds: int):
try:
print(f"Starting long-running task for {seconds} seconds...")
time.sleep(seconds)
print("Task completed!")
return f"Task finished after {seconds} seconds"
except Exception as exc:
logger.error(f"Task failed: {exc}")
raise self.retry(exc=exc, countdown=5) # Retry after 5 seconds
Here, we use the
@celery_app.task(bind=True, retry_backoff=True, max_retries=3)
decorator to enable retries. If the task fails,
Celery
will retry it up to three times, with an increasing delay between retries. This is a very basic example of error handling. Add more sophisticated error handling, like logging and monitoring.
Best Practices
- Keep Tasks Small: Break down complex tasks into smaller, more manageable units. This makes them easier to debug, test, and scale.
- Idempotency: Make your tasks idempotent (meaning they can be safely executed multiple times without unintended side effects). This is important for handling retries.
- Serialization: Ensure that your task arguments and results are properly serialized and deserialized. Consider using JSON serialization for simplicity and compatibility.
- Monitoring: Use a monitoring tool like Flower to monitor your tasks and workers. This helps you identify and resolve issues quickly.
- Configuration: Externalize your Celery configuration (broker URL, result backend, etc.) using environment variables. This makes it easier to manage your application in different environments.
- Logging: Use a logging library to log important events, errors, and warnings. This helps you understand what’s happening in your application.
- Testing: Write tests for your tasks and your FastAPI endpoints that interact with Celery . This helps ensure that your application works as expected.
Conclusion
That’s it, guys! You’ve now set up Celery to run asynchronous tasks with your FastAPI application! You’ve learned how to configure Celery , define tasks, run workers, integrate tasks with FastAPI , monitor tasks, and schedule tasks. You are now equipped with the basic knowledge. This opens up a world of possibilities for building scalable and responsive web applications. This is the foundation to use when building more complex systems. Keep experimenting and building amazing stuff. Have fun, and happy coding!