Flask is a micro - web framework written in Python. It is known for its simplicity and flexibility, making it a popular choice for building small to medium - sized web applications. Flask provides a simple way to handle HTTP requests and responses, but it runs in a single thread by default, which means long - running tasks can block the main thread and make the application unresponsive.
Celery is an asynchronous task queue/job queue based on distributed message passing. It allows you to offload time - consuming tasks from your main application to a separate worker process. Celery uses a message broker (such as Redis or RabbitMQ) to communicate between the main application and the worker processes. When a task is sent to Celery, it is added to the message queue, and the worker processes pick up tasks from the queue and execute them asynchronously.
Task scheduling in Celery refers to the ability to run tasks at specific times or intervals. You can schedule tasks to run periodically (e.g., every hour, every day) or at a specific future time (one - off tasks).
First, you need to install Flask, Celery, and a message broker. In this example, we will use Redis as the message broker.
pip install flask celery redis
# app.py
from flask import Flask
from celery import Celery
# Initialize Flask app
app = Flask(__name__)
# Initialize Celery
celery = Celery(app.name, broker='redis://localhost:6379/0')
celery.conf.update(app.config)
@celery.task
def example_task():
"""Example Celery task"""
print("Task executed successfully!")
return "Task completed"
@app.route('/trigger_task')
def trigger_task():
"""Trigger the Celery task"""
example_task.delay()
return "Task has been sent to the queue"
if __name__ == '__main__':
app.run(debug=True)
In this code, we first initialize a Flask application and a Celery instance. The example_task
is a simple Celery task that prints a message and returns a string. The /trigger_task
route in the Flask application triggers the task by calling example_task.delay()
, which sends the task to the message queue.
To start the Celery worker, run the following command in the terminal:
celery -A app.celery worker --loglevel=info
To schedule a periodic task, you need to configure Celery’s beat scheduler.
# app.py (updated)
from flask import Flask
from celery import Celery
from datetime import timedelta
# Initialize Flask app
app = Flask(__name__)
# Initialize Celery
celery = Celery(app.name, broker='redis://localhost:6379/0')
celery.conf.update(app.config)
# Configure Celery beat
celery.conf.beat_schedule = {
'periodic-task': {
'task': 'app.example_task',
'schedule': timedelta(minutes=1)
}
}
@celery.task
def example_task():
"""Example Celery task"""
print("Periodic task executed successfully!")
return "Periodic task completed"
@app.route('/trigger_task')
def trigger_task():
"""Trigger the Celery task"""
example_task.delay()
return "Task has been sent to the queue"
if __name__ == '__main__':
app.run(debug=True)
In this code, we added a beat_schedule
configuration to Celery. The periodic - task
will run the example_task
every minute.
To start the Celery beat scheduler, run the following command in the terminal:
celery -A app.celery beat --loglevel=info
To schedule a one - off task to run at a specific future time, you can use the eta
(estimated time of arrival) parameter.
from datetime import datetime, timedelta
# Calculate the execution time (e.g., 5 minutes from now)
execution_time = datetime.utcnow() + timedelta(minutes=5)
# Schedule the task
example_task.apply_async(eta=execution_time)
Scheduling tasks in Flask with Celery is a powerful technique that allows you to handle time - consuming and resource - intensive tasks efficiently. By offloading these tasks to a separate worker process, you can improve the performance and responsiveness of your Flask application. Understanding the core concepts, typical usage scenarios, common pitfalls, and best practices will help you use Celery effectively in your real - world projects.
Remember to start the Celery worker and beat scheduler when running your application, and always monitor the task execution to ensure everything is working as expected.