← home

Setting up a celery task scheduler in Flask

The first thing that comes to mind when considering a task scheduler is a CRON job. As most of today’s servers are hosted on Linux machines, setting a cron job for a periodic task might seem like a good option for many. However, in production, having a crontab can be nothing but a pain. It can be tricky to configure different time zones depending on the location of the server.

The biggest problem with this approach arises when the application is scaled across multiple web servers. Instead of running one cron job, we could be running multiple cron jobs, which might lead to race conditions. Additionally, it’s hard to debug if something goes wrong with the task.

With Flask, there are multiple ways to address this problem, and Celery is one of the most popular solutions. Celery addresses the above issues quite gracefully. It uses the same time zones as pytz, which helps in accurately calculating time zones and setting the scheduler timings.

Celery uses a backend message broker (Redis or RabbitMQ) to save the state of the schedule, acting as a centralized database server for multiple Celery workers running on different web servers. The message broker ensures that the task is run only once per the schedule, thus eliminating race conditions.

Monitoring real-time events is also supported by Celery. It includes a beautiful built-in terminal interface that shows all the current events. A nice standalone project, Flower, provides a web-based tool to administer Celery workers and tasks. It also supports asynchronous task execution, which is handy for long-running tasks.

Let’s go hacking

Here, we will be using a Dockerized environment. The installation of Redis and Celery can differ from system to system, and Docker environments are pretty common nowadays for such exercises without worrying much about local development infrastructure.

flask-celery
│  app.py
│  docker-compose.yml
│  Dockerfile
│  entrypoint.sh
│  requirements.txt
└────────────────────────

Let’s start with the Dockerfile.

FROM python:3.7

# Create a directory named flask
RUN mkdir flask

# Copy everything to flask folder
COPY . /flask/

# Make flask as working directory
WORKDIR /flask

# Install the Python libraries
RUN pip3 install --no-cache-dir -r requirements.txt

EXPOSE 5000

# Run the entrypoint script
CMD ["bash", "entrypoint.sh"]

The packages required for this application are mentioned in the requirements.txt file.

Flask==1.0.2
celery==4.3.0
redis==3.3.11

The entry point script goes here.

#!/bin/sh

flask run --host=0.0.0.0 --port 5000

Celery uses a message broker to pass messages between the web app and Celery workers. Here, we will set up a Redis container to be used as the message broker.

version: "3.7"

services:

  redis:
    container_name: redis_dev_container
    image: redis
    ports:
      - "6379:6379"

  flask_service:
    container_name: flask_dev_container
    restart: always
    image: flask
    build:
      context: ./
      dockerfile: Dockerfile
    depends_on:
        - redis
    ports:
      - "5000:5000"
    volumes:
      - ./:/flask
    environment:
        - FLASK_DEBUG=1

Now we are all set to start our little experiment. We have a Redis container running on port 6379 and a Flask container running on localhost:5000. Let’s add a simple API to test whether our tiny web application works.

from flask import Flask

app = Flask(__name__)

@app.route("/")
def index_view():
    return "Flask-celery task scheduler!"

if __name__ == "__main__":
    app.run()

And voila!

Now, we will build a simple timer application that will show the elapsed time since the application started. We need to configure Celery with the Redis server URL, and we will also use another Redis database to store the time.

  from flask import Flask
  from celery import Celery
  import redis

  app = Flask(__name__)

  # Add Redis URL configurations
  app.config["CELERY_BROKER_URL"] = "redis://redis:6379/0"
  app.config["CELERY_RESULT_BACKEND"] = "redis://redis:6379/0"

  # Connect Redis db
  redis_db = redis.Redis(
      host="redis", port="6379", db=1, charset="utf-8", decode_responses=True
  )

  # Initialize timer in Redis
  redis_db.mset({"minute": 0, "second": 0})

  # Add periodic tasks
  celery_beat_schedule = {
      "time_scheduler": {
          "task": "app.timer",
          # Run every second
          "schedule": 1.0,
      }
  }

# Initialize Celery and update its config
celery = Celery(app.name)
celery.conf.update(
    result_backend=app.config["CELERY_RESULT_BACKEND"],
    broker_url=app.config["CELERY_BROKER_URL"],
    timezone="UTC",
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    beat_schedule=celery_beat_schedule,
)


@app.route("/")
def index_view():
    return "Flask-celery task scheduler!"


@app.route("/timer")
def timer_view():
    time_counter = redis_db.mget(["minute", "second"])
    return f"Minute: {time_counter[0]}, Second: {time_counter[1]}"


@celery.task
def timer():
    second_counter = int(redis_db.get("second")) + 1
    if second_counter >= 59:
        # Reset the counter
        redis_db.set("second", 0)
        # Increment the minute
        redis_db.set("minute", int(redis_db.get("minute")) + 1)
    else:
        # Increment the second
        redis_db.set("second", second_counter)


if __name__ == "__main__":
    app.run()

Let’s update the entrypoint.js to run both the Celery worker and the beat server as background processes.

#!/bin/sh

# Run Celery worker
celery -A app.celery worker --loglevel=INFO --detach --pidfile=''

# Run Celery Beat
celery -A app.celery beat --loglevel=INFO --detach --pidfile=''

flask run --host=0.0.0.0 --port 5000

Our very own timer

This application is only for demonstration purposes. The counter won’t be accurate as the task processing time is not taken into account while calculating the time.

Monitoring events

Celery has rich support for monitoring various statistics for tasks, workers, and events. We need to log into the container to enable and monitor events.

docker exec -it flask_dev_container bash

Enable and list all events.

celery -A app.celery control enable_events

celery -A app.celery events

This spins up a nice interactive terminal UI listing all the details of the scheduled tasks.

Conclusion

In this post, I have used Celery as a better alternative to crontabs, even though the primary purpose of Celery is processing task queues. Both the Celery worker and beat server can be run on different containers as running background processes on the web container is not considered best practice.

Unless you are creating a stupid timer application.

The above-mentioned code can be found here: repo

Adios!