Advanced Python Programming - Implement a Queue with Redis
Redis, with its blazing speed and versatile data structures, is an excellent choice for implementing message queues.
This blog will explore two primary approaches:
- Using Redis Lists for simpler queues.
- Redis Streams for more robust, modern messageing patterns.
1.Simple Queue with Redis Lists and Hashes
This approach is good for basic FIFO queues where you need to store job details separately from the queue itself.
Data Objects:
- Queue: A Redis List (LPUSH to add, BRPOP to consume). This list holds only unique identifiers (e.g., job_id)
- Job Data: For each job_id, a separate Redis Hash (HSET, HGETALL) stores the associated payload/details.
Example - Email Sending queue
Producer (Enqueueing Jobs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import redis, json, uuid, time
# Connect to Redis
# Assuming Redis is running on localhost:6379
r = redis.Redis(host = 'localhost', port = 6379, db=0, decode_responses = True)
def enqueue_email_job(recipient, subject, body):
job_id = str(uuid.uuid4()) # Generate a unique ID for the job
# 1.Store job details in a Redis Hash
job_details = {
"recipient": recipient,
"subject":subject,
"body":body,
"status":"pending",
"created_at":time.time()
}
r.hmset(f"job:{job_id}", job_details) # Use HMSET for setting multiple fields in a hash
# 2. Add job_id to the queue (right push, so LPUSH by consumer is FIFO)
# Using LPUSH FOR THE PRODUCER AND brpop FOR THE CONSUMER MAKES IT A fifp QUEUE
r.lpush("email_queue",job_id)
print(f"Enqueued job {job_id} for {recipient}. Details stored in 'job:{job_id}'")
return job_id
if __name__ == "__main__":
print("--- Email Producer ---")
enqueue_email_job("alice@email.com","Welcome to Our service", "Thanks for signing up!")
time.sleep(0.1)
enqueue_email_job("bob@email.com", "Your order confirmation","Order #12345 has been placed.")
time.sleep(0.1)
enqueue_email_job("charlie@mail.com","Password Reset", "Click here to reset your password.")
time.sleep(0.1)
print("Producer finished.")Consumer(Dequeuing and Processing Jobs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41import redis, json, time
# Connect to Redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def process_email_job(job_id):
#1. Retrieve job details from the Hash
job_details = r.hgetall(f"job:{job_id}")
if not job_details:
print(f"Error: Job{job_id} details not found. Skipping.")
return
print("\n")
print(f"Processing job {job_id}:")
print(f" Recipient:{job_details.get("recipent")}")
print(f" Subject:{job_details.get("subject")}")
print(f" Body:{job_details.get("body")}")
print(f" Status:{job_details.get("status")}")
# Simulate email sending process
time.sleep(2)
# 2. Update job status
r.hset(f"job:{job_id}", "status", "sent")
print(f" Job {job_id} status updated to 'sent'.")
# 3. Clean up: Remove job details
r.delete(f"job:{job_id}")
print(f"Cleaned up job details for {job_id}.")
def start_consumer():
print("--- Email Consumer Started ---")
while True:
# BRPOP blocks untile an element is available in 'email_queue'
# The '0' timeout means block indefinitely
# Returen a tuple: (lie_name, element)
queue_name, job_id = r.brpop("email_queue", timeout = 0)
if job_id: # If a job_id was retrieved
process_email_job(job_id)
else:
print("No jobs in queue. Waiting ...") # this line won't be triggered as timeout is 0, the r.brpop will wait untill a job_id comes
if __name__ = "__main__":
start_consumer()
2. Advanced Queue Features Implemented with Redis
There are some common advanced queue features using Redis.
2.1 Reliable Delivery / At-Least-Once Delivery
This is a critical feature, ensuring a message is processed even if a consumer crashes mid-processing.
Implementation with Redis Lists (using RPOPLPUSH/LMOVE):
- Main Queue: email_queue
- Processing Queue (Temporary): email_queue:processing
- Producer is same with above
- Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import redis, json, uuid, time
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def process_email_job_reliable(job_id):
job_details = r.hgetall(f"job:{job_id}")
if not job_details:
print(f"Error: Job{job_id} details not found for reprocessing. Skipping.")
return False # Indicate failure
print("\n")
print(f"Processing reliable job {job_id}:")
print(f" Recipient: {job_details.get('recipient')}")
# Simulate email sending process
time.sleep(2)
# Atomically remove from processing queue and delete job details
r.delete(f"job:{job_id}")
print(f" Reliable job {job_id} procesed and cleaned up.")
return True # Indicate success
def start_reliable_consumer():
print("--- Reliable Email Consumer Started ---")
while True:
# Atomically move a job_id from 'email_queue' to 'email_queue:processing'
# If 'email_queue' is empty, BRPOPLPUSH blocks until an element is availble.
# This ensures that if the consumer crashes after getting the job, it's still in 'processing'.