Implementing a Simple, Practical Queue with Redis

Advanced Python Programming - Implement a Queue with Redis

Redis, with its blazing speed and versatile data structures, is an excellent choice for implementing message queues.
This blog will explore two primary approaches:

  1. Using Redis Lists for simpler queues.
  2. Redis Streams for more robust, modern messageing patterns.

1.Simple Queue with Redis Lists and Hashes

This approach is good for basic FIFO queues where you need to store job details separately from the queue itself.
Data Objects:

  • Queue: A Redis List (LPUSH to add, BRPOP to consume). This list holds only unique identifiers (e.g., job_id)
  • Job Data: For each job_id, a separate Redis Hash (HSET, HGETALL) stores the associated payload/details.

Example - Email Sending queue

  1. Producer (Enqueueing Jobs)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import redis, json, uuid, time
    # Connect to Redis
    # Assuming Redis is running on localhost:6379
    r = redis.Redis(host = 'localhost', port = 6379, db=0, decode_responses = True)

    def enqueue_email_job(recipient, subject, body):
    job_id = str(uuid.uuid4()) # Generate a unique ID for the job
    # 1.Store job details in a Redis Hash
    job_details = {
    "recipient": recipient,
    "subject":subject,
    "body":body,
    "status":"pending",
    "created_at":time.time()
    }
    r.hmset(f"job:{job_id}", job_details) # Use HMSET for setting multiple fields in a hash

    # 2. Add job_id to the queue (right push, so LPUSH by consumer is FIFO)
    # Using LPUSH FOR THE PRODUCER AND brpop FOR THE CONSUMER MAKES IT A fifp QUEUE
    r.lpush("email_queue",job_id)
    print(f"Enqueued job {job_id} for {recipient}. Details stored in 'job:{job_id}'")
    return job_id

    if __name__ == "__main__":
    print("--- Email Producer ---")
    enqueue_email_job("alice@email.com","Welcome to Our service", "Thanks for signing up!")
    time.sleep(0.1)
    enqueue_email_job("bob@email.com", "Your order confirmation","Order #12345 has been placed.")
    time.sleep(0.1)
    enqueue_email_job("charlie@mail.com","Password Reset", "Click here to reset your password.")
    time.sleep(0.1)
    print("Producer finished.")
  2. Consumer(Dequeuing and Processing Jobs)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import redis, json, time
    # Connect to Redis
    r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
    def process_email_job(job_id):
    #1. Retrieve job details from the Hash
    job_details = r.hgetall(f"job:{job_id}")
    if not job_details:
    print(f"Error: Job{job_id} details not found. Skipping.")
    return
    print("\n")
    print(f"Processing job {job_id}:")
    print(f" Recipient:{job_details.get("recipent")}")
    print(f" Subject:{job_details.get("subject")}")
    print(f" Body:{job_details.get("body")}")
    print(f" Status:{job_details.get("status")}")

    # Simulate email sending process
    time.sleep(2)

    # 2. Update job status
    r.hset(f"job:{job_id}", "status", "sent")
    print(f" Job {job_id} status updated to 'sent'.")

    # 3. Clean up: Remove job details
    r.delete(f"job:{job_id}")
    print(f"Cleaned up job details for {job_id}.")

    def start_consumer():
    print("--- Email Consumer Started ---")
    while True:
    # BRPOP blocks untile an element is available in 'email_queue'
    # The '0' timeout means block indefinitely
    # Returen a tuple: (lie_name, element)
    queue_name, job_id = r.brpop("email_queue", timeout = 0)

    if job_id: # If a job_id was retrieved
    process_email_job(job_id)
    else:
    print("No jobs in queue. Waiting ...") # this line won't be triggered as timeout is 0, the r.brpop will wait untill a job_id comes
    if __name__ = "__main__":
    start_consumer()

2. Advanced Queue Features Implemented with Redis

There are some common advanced queue features using Redis.

2.1 Reliable Delivery / At-Least-Once Delivery

This is a critical feature, ensuring a message is processed even if a consumer crashes mid-processing.

Implementation with Redis Lists (using RPOPLPUSH/LMOVE):

  • Main Queue: email_queue
  • Processing Queue (Temporary): email_queue:processing
  1. Producer is same with above
  2. Consumer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import redis, json, uuid, time
    r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

    def process_email_job_reliable(job_id):
    job_details = r.hgetall(f"job:{job_id}")
    if not job_details:
    print(f"Error: Job{job_id} details not found for reprocessing. Skipping.")
    return False # Indicate failure

    print("\n")
    print(f"Processing reliable job {job_id}:")
    print(f" Recipient: {job_details.get('recipient')}")

    # Simulate email sending process
    time.sleep(2)

    # Atomically remove from processing queue and delete job details
    r.delete(f"job:{job_id}")
    print(f" Reliable job {job_id} procesed and cleaned up.")
    return True # Indicate success

    def start_reliable_consumer():
    print("--- Reliable Email Consumer Started ---")
    while True:
    # Atomically move a job_id from 'email_queue' to 'email_queue:processing'
    # If 'email_queue' is empty, BRPOPLPUSH blocks until an element is availble.
    # This ensures that if the consumer crashes after getting the job, it's still in 'processing'.