Distributed Computing & Concurrency with Python & Redis.

Distributed Computing, Multithreading & Asyncio with Python & Redis

Zach Wolpe
11 min readNov 10, 2023

Simple architectures work. Unless you work for a huge corporation — where you have zero decision power and play a very well-defined role — it’s almost always better to favour simple, reliable & predictable system design. Software design needs to be maintainable and encourage rapid CI/CD.

So much of Data Science & Machine learning is built in Python. Python is awesome, but slow.

How do you scale your Python workloads?

A common approach is to focus on developing concurrent algorithms. Python is not built for concurrency, and in my opinion, it’s mostly a waste of time unless you have a very specific I/O-bound workload.

I find a much better approach is to deploy a simple distributed system that lets you spin up workers as needed.

This approach is also very useful in ML engineering: batch processing jobs; distributing independent inference calls; & independent feature engineering pipelines can all scale up and down dynamically.

The end result is a simple, scalable, maintainable, modular system.

Here I detail common mistakes made implementing concurrent algorithms in Python, & offer a better (simple) distributed computing architecture.

The Concurrent Fallacy

After meticulously writing your concurrent Python algorithm, you’re devastated to see it now runs slower than before. Why?

Concurrency in Python is easy to get wrong, and even when handled correctly the benefits are limited.

1. Concurrency is easy to implement, wrongly.

It’s easy to set up multiprocessing, multithreading or asyncio in Python. The flow of execution can, however, quickly become unpredictable without careful attention to when: daemon threads/processes are launched; event loops are terminated; threads/processes are terminated & what remains in the cache/RAM.

It’s very easy to leave dangling threads/processes — failing to shut down CPU processes and clean up RAM/CPU resource utility as you go.

Python developers are used to the garbage collector doing all the dirty work for them, and so it’s easy to miss memory or CPU leakage. To worsen this, leakage is usually only visible on bigger workloads — which can be missed in dev/test environments (see point 5).

2. The GIL prevents Parallelism in Python.

CPython’s Global Interpreter Lock (GIL) prevents any real parallelism. It is impossible to actually launch multiple threads in a single execution. Instead, threading is simulated by rapid context switching via time-sliced execution, performed by the interpreter (we have no authority over the timing of context switching).

3. Concurrency in Python only benefits from IO-bound workloads, not CPU-bound work.

Context switching isn’t free. It requires stashing the state of an existing task and loading the state of the next.

Because of the limitations of parallelism enforced by the GIL, and the cost of context switching, it’s very likely that CPU-bound algorithms will become slower after threading.

Only use threading or asynchronous programming on IO-bound workloads.

CPU-Bound

Attempting to use multithreading to speed up a CPU-bound workload will not speed up the runtime as the GIL prevents parallelism. The additional overhead of context switching renders the time-sliced execution slower than a single-threaded execution.

I/O Bound

If your workload is I/O-bound you will benefit from multithreading if implemented properly, however in this case it is recommended to instead use use use asyncio — providing the same performance benefit but with less complexity.

4. Thread-safety

Some resources are not thread-safe. This means that the resource is not safe to access from multiple threads at the same time. This is because the resource is not protected by a mutex. `print` or `list` to the console is an example of a not thread-safe resource, because they may be interrupted by another thread.

  • A race condition occurs when multiple threads are trying to access the same resource at the same time.
  • A lock can be used to protect a resource from being accessed by multiple threads at the same time.
  • A threading lock is a synchronization primitive that provides exclusive access to a shared resource in a multithreaded application. A thread lock is also known as a mutex which is short for mutual exclusion.
  • See [threading-5-lock.py].

5. Shared resources & order of operations matters

Multithreading/processing is dangerous if resources are shared/global or if the order of operations matters. Ensure to isolate fragments before writing concurrent code.

6. Monitoring and testing are more involved and easy to neglect.

Extensive testing & monitoring is required to catch leaks. As processes grow in complexity leaks and processes/thread runtimes can become opaque. Leakage can also creep up over time, a small amount of leakage compounds, which may avoid detection in dev/test but become apparent when demand scales in prod.

Concurrency Implementation Tips

  1. Favour async over multithreading. Predictable workloads can be explicitly managed with asynchronous programming. An IO-bound workload can benefit from threading, but it's simpler to use async.
  2. Only use concurrency for IO-bound workloads: A single thread is favoured for CPU-bound workloads.
  3. Write code horizontal scaling from the get-go. See below.
  4. Ensure testing/staging matches production. If you implement multithreading/processing, ensure that the batch size and workload are rigorously tested to catch memory or CPU leaks. The throughput in prod can dwarf that in standard testing/dev environments. Use tools like docker stats to monitor runtime and possibly even write scripts to flag memory/CPU leakage.
  5. “ThreadPoolExecutor” and “ProcessPoolExecutor” (futures): It is recommended to use “ThreadPoolExecutor” or “ProcessPoolExecutor” instead of “threading.Thread” or “multiprocessing.Process” directly. This is because the ThreadPoolExecutor and ProcessPoolExecutor are higher-level abstractions that provide a simpler interface to multithreading and multiprocessing.
  6. Fuzzing: It is recommended to use “fuzzing” during development/testing — adding random noise as time delays during execution. Dev/test/stage are often more stable, leading to falsely predictable results.

Distributed Computing with Redis

A better solution is to skip trying to optimize runtime on one machine and build to scale horizontally. Here is an example of a simple scalable architecture:

Using a message queue (Redis) to distribute computationally intensive (CPU bound) tasks to multiple workers (Python).

Architecture

  1. Decouple your code into several microservices.

In a machine learning context, we perform inference at multiple stages when deploying an algorithm (feature engineering, exclusions, business logic etc). Decoupling these logically isolated units means we can use cache results when runtime matters and spin up different module stages simultaneously. These microservices are also called “workers” in this context.

2. Build a Queue to handle communication between microservices.

Microservices need a way to communicate. We can use a queue to decentralize this communication. We’ll use Redis (an in-memory database) to build our queue.

3. Build a main app to handle requests.

You’ll need some front-end logic to handle requests. Normally this handles incoming web app traffic, but the same approach can be used to handle request launch ML training/inference modules. This becomes your API gateway to either:

  • Interact with the front end (usually through a load balancer).
  • Launch training/inference jobs.

This main app does not handle the request, it only receives them.

4. Connect the main app to the microservers.

  • Receive Requests: The core app receives requests (API enpoints, launching training jobs etc) and filters, and channel them to the correct queue.
  • Push to Redis: The core app then pushes the requests to the Redis queue/s.
  • Workers listen to the Queue: These workers listen to the queue/s and fetch and process requests as they become available.
Architecture Schematic: Elastic Load Balancer is used to distribute requests to various apps. The apps filter requests and push them to Redis. Workers listen to Redis and process jobs as they become available.

Workers can then be spun up and shut down as required.

Build

A complete build is available here.

  1. Main App: pushes tasks/messages to queue.
  2. Workers: pull tasks from queue and process them.

NB: It is important that the order of tasks is irrelevant.

The queue should be thread-safe and atomic (Redis is both). - atomic: the queue will not be corrupted if multiple workers try to pull a task at the same time. - thread-safe: the queue will not be corrupted if multiple workers try to push a task at the same time.

Redis

Redis is an in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes with radius queries and streams.

Redis guarantees that multiple clients will not be able to corrupt the queue when pushing or pulling tasks.

Resilience to runtime crashes

If we need resilience to runtime crashes, Redis requires additional configuration. Using Rabbit or SQS solve this problem.

Getting Started

This example’s code is available here.

1. Setup

Setup a directory distributed-computation/ which contains:

  • app/: subdirectory.
  • worker/: subdirectory.
  • docker-compose.yml: a docker-compose file used to build a Redis docker image and launch a container.

2. Build and launch the Redis Image

Install and launch Redis. Here I use the official Redis docker image. Here's an example docker-compose.yaml

version: '3'

services:
redis:
image: redis:latest
container_name: redis_distributed_queue
restart: always
ports:
- '6379:6379'
command: redis-server --save 30 1 --loglevel warning --requirepass dummy-pass # dummy-pass is our password
volumes:
- data-volume:/data

volumes:
data-volume:

3. Write a module to interact with Redis

The below module and push and pop to our Redis queue using the Python Redis API.

"""
------------------------------------------------------------------------
Redis module

Helper module to
- Access redis db
- pull from a redis queue
- push to a redis queue

: 08.11.23
: zach wolpe
: zach.wolpe@medibio.com.au
------------------------------------------------------------------------
"""
import redis

class redis_access:

@staticmethod
def redis_db(config):
db = redis.Redis(
host = config.redis_host,
port = config.redis_port,
db = config.redis_db,
password = config.redis_password
)
# query access
db.ping()
return db

@staticmethod
def redis_queue_push(config, db, message):
"""push to tail of the queue (left of list)"""
db.lpush(config.redis_queue_name, message)

@staticmethod
def redis_queue_pop(config, db):
"""
pop from head of the queue (right of list)
the `b` in `brpop` indicates this is a blocking call (waits until an item becomes available).
"""
_, message_json = db.brpop(config.redis_queue_name)
return message_json

4. Config

Set up a config management system. Use secrets and env variables in production, but here I’ll use a config.py file for demonstrative purposes.

A config.py file is added to both directories. These config files are identical for simplicity.

"""
------------------------------------------------------------------------
App/Worker Config

For demonstrative purposes only. Do not store sensitive information in this file.
In practice you would probably use a safer mechanism (such as ENV variables).

: 08.11.23
: zach wolpe
: zach.wolpe@medibio.com.au
------------------------------------------------------------------------
"""


redis_host = 'localhost'
redis_port = 6379
redis_db = 0
redis_password = 'dummy-pass'
redis_queue_name = 'distributed-queue'

5. Build the main web app API

This represents our app frontend and training/inference job API. This file is responsible for:

  • Compiling messages to push to the Redis queue.
  • pushing the messages to Redis.
"""
------------------------------------------------------------------------
Main application file.

Push messages to into the redis queue.


: 08.11.23
: zach wolpe
: zach.wolpe@medibio.com.au
------------------------------------------------------------------------
"""

import datetime
import random
import config
import json
import time
import uuid

from redis_module import redis_access

def main(num_messages:int, delay:float=1):
"""
Generate random messages and push to redis queue.

Arguments:
num_messages: number of messages to generate

Return: None
"""

db = redis_access.redis_db(config=config)

for i in range(num_messages):
# generate random message
id = str(i) + '_xv1'
message = {
'id': str(uuid.uuid4()),
'timestamp': str(datetime.datetime.now()),
'value': random.randint(0, 100),
'data': {
'message_no': id,
'message': f'Hello world {id}:{random.randint(0, 100)}'
}
}

message_json = json.dumps(message)

# push to redis queue
_msg_sample = message['data']['message'][:4] + '...'
print(f'Pushing message number {i} (id: {id}): message={_msg_sample}')
redis_access.redis_queue_push(config, db, message_json)

time.sleep(delay)

if __name__ == '__main__':
print('Launching main application...')
main(num_messages=20, delay=1)
print('Main application completed.')

6. Build Worker/s

Build workers that:

  • Listen to the Redis Queue.
  • If a message is available, pop and process the request.
"""
------------------------------------------------------------------------
Worker Instance.

Workers listen to the redis queue and process messages.


: 08.11.23
: zach wolpe
: zach.wolpe@medibio.com.au
------------------------------------------------------------------------
"""

import config
import random
import json


from redis_module import redis_access


def process_message(db, message_json: str):
message = json.loads(message_json)
print(f"Message received: id={message['id']}, message_number={message['data']['message_no']}")

# mimic potential processing errors
processed_ok = random.choices((True, False), weights=(5, 1), k=1)[0]
if processed_ok:
print("\t>> Processed successfully.")
else:
print("\tProcessing failed - requeuing...")
redis_access.redis_queue_push(config, db, message_json)


def main():
"""
Consumes items from the Redis queue.
"""

# connect to Redis
db = redis_access.redis_db(config)

while True:
message_json = redis_access.redis_queue_pop(config, db) # this blocks until an item is received
process_message(db, message_json)


if __name__ == '__main__':
print('Launching worker...')
main()
print('Worker terminated successfully.')

That’s it! You’re ready to scale your workers across multiple instances.

Runtime Example

Start a Redis container

Build the Redis Image and launch a container.

docker-compose up -d

Redis is now running a docker container.

Launch the main app

Launch the main app. This connects to our Redis database via the config and pushes data to the queue.

python app/main.py

Launch the Redis CLI

In a new shell, launch the Redis CLI to interact with the Redis server.

docker compose run redis redis-cli -h redis -a dummy-pass -n 0

Note: <dummy-pass> is the password name specified in the config.

Some useful CLI calls include:

  • KEYS * : lists all queues.
  • DEL <QUEUE> : deletes a given queue.
  • GET <KEY> : fetches a key.
  • LRANGE <QUEUE> 0 -1 : lists all items in a queue.

These will be shown once data is pushed to a queue.

Launch Workers

Launch worker apps. The works listen to the Redis queue and process requests as they become available.

Here I launch 3worker instances, notice how they all pull different messages from the queue.

Production Example

Putting it all together, re-run app/main.py and launch 3 workers. We also simulate failure to demonstrate how Redis pushes failed requests back to the queue.

Improvements

  • Relisance: Use RabbitMQ or SQS for resilience to runtime crashes.
  • de-duplicate messages: Use another Redis database to de-duplicate messages (as a safety measure). Store a successfully processed message ID (with some TTL), and double-check that the message was not already handled successfully before handling it. TTL (Time To Live) is a way to set an expiration time on a key. After the expiration time, the key will be automatically deleted.
  • monitoring: Use another Redis queue as a DLQ (dead letter queue) to store messages that the messaging system cannot or should not deliver. Monitor the DLQ for messages that cannot be handled.
  • monitoring: Use another Redis queue to store "in-process" items, and processing - see the Redis documentation for the Pattern: Reliable Queue for more details. Prevent the loss of a message if a worker is killed after popping from the queue, but before completing its work. A more complete solution like RabbitMQ or SQS/ElasticMQ is better suited for this.

Code & Background

See a detailed explanation of concurrency, the distributed architecture & all the code, visit this repo.

--

--