Published on

Using Python RQ for Task Queues in Python

Authors

This is a getting started on python-rq tutorial and I will demonstrate how to work with asynchronous tasks using python redis queue (python-rq).

What will we be doing

We want a client to submit 1000's of jobs in a non-blocking asynchronous fashion, and then we will have workers which will consume these jobs from our redis queue, and process those tasks at the rate of what our consumer can handle.

The nice thing about this is that, if our consumer is unavailable for processing the tasks will remain in the queue and once the consumer is ready to consume, the tasks will be executed. It's also nice that its asynchronous, so the client don't have to wait until the task has finished.

We will run a redis server using docker, which will be used to queue all our jobs, then we will go through the basics in python and python-rq such as:

  • Writing a Task
  • Enqueueing a Job
  • Getting information from our queue, listing jobs, job statuses
  • Running our workers to consume from the queue and action our tasks
  • Basic application which queues jobs to the queue, consumes and action them and monitors the queue

Redis Server

You will require docker for this next step, to start the redis server:

docker run --rm -itd --name redis -p 6379:6379 redis:alpine

Python RQ

Install python-rq:

pip install rq

Create the task which will be actioned by our workers, in our case it will just be a simple function that adds all the numbers from a given string to a list, then adds them up and return the total value.

This is however a very basic task, but its just for demonstration.

Our tasks.py:

def sum_numbers_from_string(string):
    numbers = []
    for each_character in string:
        if each_character.isdigit():
            numbers.append(int(each_character))
    total = 0
    for each_number in numbers:
        total=total+each_number

    return total

To test this locally:

>>> from tasks import sum_numbers_from_string
>>> sum_numbers_from_string('adje-fje5-sjfdu1s-gdj9-asd1fg')
16

Now, lets import redis and redis-queue, with our tasks and instantiate a queue object:

>>> from redis import Redis
>>> from rq import Connection, Queue, Worker
>>> from tasks import sum_numbers_from_string
>>> redis_connection = Redis(host='localhost', port=6379, db=0)
>>> q = Queue(connection=redis_connection)

Submit a Task to the Queue

Let's submit a task to the queue:

>>> result = q.enqueue(sum_numbers_from_string, 'hbj2-plg5-2xf4r1s-f2lf-9sx4ff')

We have a couple of properties from result which we can inspect, first let's have a look at the id that we got back when we submitted our task to the queue:

>>> result.get_id()
'5a607474-cf1b-4fa5-9adb-f8437555a7e7'

We can also get the status from our task:

>>> result.get_status()
'queued'

We can also view our results in json format:

>>> import json
>>> print(json.dumps(result.to_dict(), indent=2, default=str))
{
  "created_at": "2020-05-16T11:56:49.892713Z",
  "data": "b'..\\x00\\x99\\xa0\\x16\\xfe..'",
  "origin": "default",
  "description": "tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff')",
  "enqueued_at": "2020-05-16T11:56:49.893252Z",
  "started_at": "",
  "ended_at": "",
  "timeout": 180,
  "status": "queued"
}

If we dont have context of the job id, we can use get_jobs to get all the jobs which is queued:

>>> list_jobs = q.get_jobs
>>> list_jobs()
[Job('5a607474-cf1b-4fa5-9adb-f8437555a7e7', enqueued_at=datetime.datetime(2020, 5, 16, 12, 30, 22, 699609))]

Then we can loop through the results and get the id like below:

>>> for j in list_jobs():
...     j.id
...
'5a607474-cf1b-4fa5-9adb-f8437555a7e7'

Or to get the job id's in a list:

>>> list_job_ids = q.get_job_ids()
>>> list_job_ids
['5a607474-cf1b-4fa5-9adb-f8437555a7e7']

Since we received the job id, we can use fetch_job to get more info about the job:

>>> fetched_job = q.fetch_job('5a607474-cf1b-4fa5-9adb-f8437555a7e7')
>>> fetched_job
Job('5a607474-cf1b-4fa5-9adb-f8437555a7e7', enqueued_at=datetime.datetime(2020, 5, 16, 12, 30, 22, 699609))

And as before we can view it in json format:

>>> fetched_job.to_dict()
{'created_at': '2020-05-16T12:30:22.698728Z', 'data': b'..x\x9c6\xfe..', 'origin': 'queue1', 'description': "tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff')", 'enqueued_at': '2020-05-16T12:30:22.699609Z', 'started_at': '', 'ended_at': '', 'timeout': 180, 'status': 'queued'}

We can also view the key in redis by passing the job_id:

>>> result.key_for(job_id='5a607474-cf1b-4fa5-9adb-f8437555a7e7')
b'rq:job:5a607474-cf1b-4fa5-9adb-f8437555a7e7'

To view how many jobs are in our queue, we can either do:

>>> len(q)
1

or:

>>> q.get_job_ids()
['5a607474-cf1b-4fa5-9adb-f8437555a7e7']

Consuming from the Queue

Now that our task is queued, let's fire of our worker to consume the job from the queue and action the task:

>>> w = Worker([q], connection=redis_connection)
>>> w.work()
14:05:35 Worker rq:worker:49658973741d4085961e34e9641227dd: started, version 1.4.1
14:05:35 Listening on default...
14:05:35 Cleaning registries for queue: default
14:05:35 default: tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff') (5a607474-cf1b-4fa5-9adb-f8437555a7e7)
14:05:40 default: Job OK (5a607474-cf1b-4fa5-9adb-f8437555a7e7)
14:05:40 Result is kept for 500 seconds
14:05:59 Warm shut down requested
True

Now, when we get the status of our job, you will see that it finished:

>>> result.get_status()
'finished'

And to get the result from our worker:

>>> result.result
29

And like before, if you dont have context of your job id, you can get the job id, then return the result:

>>> result = fetched_job = q.fetch_job('5a607474-cf1b-4fa5-9adb-f8437555a7e7')
>>> result.result
29

Naming Queues

We can namespace our tasks into specific queues, for example if we want to create queue1:

>>> q1 = Queue('queue1', connection=redis_connection)

To verify the queue name:

>>> q1
Queue('queue1')

As we can see our queue is empty:

>>> q1.get_job_ids()
[]

Let's submit 10 jobs to our queue:

>>> from uuid import uuid4
>>> for attempt in range(0,10):
...     random_string = uuid4().hex
...     q1.enqueue(sum_numbers_from_string, random_string)
...
Job('c3f2369d-5b27-40e0-97be-8fe26989a78e', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 472508))
Job('06b93517-5dae-4133-8131-e8d35b8dd780', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 475604))
Job('81f05aef-4bd6-421b-912d-78b5d419b10a', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 478071))
Job('8f14e81f-74fa-44d9-9fc7-e8e7b8c7b76f', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 480438))
Job('e8552750-89d2-4538-8c3e-a48c4c3e9a51', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 483106))
Job('bf19a0a3-eb0c-4692-b452-67c5ad954094', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 486193))
Job('0da3688a-cffa-4ba6-a272-b6cc90942ef6', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 488545))
Job('717bd147-615c-458d-8386-9ea6a198e137', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 491074))
Job('7cdac5aa-8dc3-40be-a8fc-b273ce61b03b', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 493618))
Job('4f7ea527-0695-4e2b-bc8b-3d8807a86390', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 496930))

To verify the number of jobs in our queue:

>>> q1.get_job_ids()
['c3f2369d-5b27-40e0-97be-8fe26989a78e', '06b93517-5dae-4133-8131-e8d35b8dd780', '81f05aef-4bd6-421b-912d-78b5d419b10a', '8f14e81f-74fa-44d9-9fc7-e8e7b8c7b76f', 'e8552750-89d2-4538-8c3e-a48c4c3e9a51', 'bf19a0a3-eb0c-4692-b452-67c5ad954094', '0da3688a-cffa-4ba6-a272-b6cc90942ef6', '717bd147-615c-458d-8386-9ea6a198e137', '7cdac5aa-8dc3-40be-a8fc-b273ce61b03b', '4f7ea527-0695-4e2b-bc8b-3d8807a86390']

And to count them:

>>> len(q1)
10

Cleaning the Queue

Cleaning the queue can either be done with:

>>> q.empty()
10

or

>>> q.delete(delete_jobs=True)

Then to verify that our queue is clean:

>>> q.get_job_ids()
[]

Naming Workers

The same way that we defined a name for our queue, we can define a name for our workers:

>>> worker = Worker([q1], connection=redis_connection, name='worker1')
>>> worker.work()

Which means you can have different workers consuming jobs from specific queues.

Resources

Documentation:

Thank You

Thanks for reading, feel free to check out my website, and subscribe to my newsletter or follow me at @ruanbekker on Twitter.

Buy Me A Coffee