Inter-service communication with Celery

When working with MicroServices, inter-service communication can get quite tricky. Celery is a distributed queue - but we tend to use it primarily for processing workloads within a service. However, it is actually quite a neat way of passing messages around between your services with a lot less overhead than something like HTTP, and relatability build in. It's also a lot simpler than something like Thrift or gRPC.

An example use-case


Let's say we have a few services, and, when we save an item in any of our services, we would like to also push the serialised version of that model to S3 for backup. So let's say we have this:

  • Service A
  • Service B
  • Pusher - the service that will push serialised models to S3

First, we will need Pusher to share its bindings. The easiest way to do this is to distribute these as a library. So, let's say that pusher has a @task called send_to_s3(key, content).

In pusher, we would implement the task:

@app.task(name='pusher.send_to_s3')
def send_to_s3(key, content):
    ...

Then, in bindings.py (for use in other services), we would just need to provide the method signature of that function:

@app.task(name='pusher.send_to_s3')
def send_to_s3(key, content):
    pass

Notes:

  1. This doesn't actually do anything. We just need to export the method signature so we can properly delegate to the `send_to_s3` method.
  2. Name is important as Celery will use this as the key to find the method

Now, we start the pusher worker, specifying a queue name ( -Q pusher ):

celery -A pusher worker -Q pusher --loglevel=info

Delegating to pusher from another service

First we need to get the bindings into our service. We recommend doing this via a pip library so that we keep our bindings manageable from a central place (and versioned). So, assuming you have a bindings project setup, you might bring them into your project with a simple pip install pusher-bindings.  

Now, once you have the bindings imported to your project, you can delegate actions to pusher from the service like so:

from bindings import send_to_s3

args = ('path/to/file.json', serialized_object) 
send_to_s3.apply_async(args, queue='pusher', serializer='json')

This will be picked up by the pusher celery worker and executed in that context

Delegation methods and versioning

To make our implementation a little easier to use, and a little less brittle, we might write a wrapper method for our function which handles delegation. For example:

def delegate_send_to_s3(key, content):
    send_to_s3.apply_async(args, queue='pusher', serializer='json')

This hides the delegation details (e.g.: the name of the queue and the serializer details. So now, calling the task in pusher from our app feels just like calling a local method.

Finally, we can also do some version handling. That way, if, for example, we make a breaking change to the send() method in pusher - this doesn't need to immediately break all the other tasks using it.

We could do something like this:

@app.task(name='pusher.send_to_s3')
def send_to_s3(key, content):
    ...

@app.task(name='pusher.send_to_s3_v2')
def send_to_s3_v2(key, content, meta):
    ...
    
def delegate_send_to_s3(key, content, version="1"):
    version_map = {
      "1": send_to_s3,
      "2": send_to_s3_v2
    }
    args = (key, content)
    version_map.get(
      version
    ).apply_async(
      args, 
      queue='pusher', 
      serializer='json'
    )
    
bindings.py with a versioned delegation method

Now, we have a means that changing the method signature in the pusher app doesn't need to break dependent apps - nor does it even particularly change the implementation a great deal.