Skip to content

Tutorial: Processing requests with AMQP

Bovine uses faststream as a wrapper around the AMQP details. The implementation details are distributed via bovine_process in the bovine_propan package. Furthermore, one has to install bovine_process with the extra rabbit.

Note

The details of the above come from the architecture evolving over time, and are also currently likely to change. Depending on how some aspects of bovine get evolved. The same is true for the details of routing keys discussed below.

Using different inbox and outbox tasks

By passing appropriate configuration variables to BovineHerd, one can change the asynchronous tasks executed. The two being inserted below just add a message to an AMQP key. These messages go to the processing topic exchange with the routing keys inbox and outbox. The message format is

{
    "bovine_name": "unique name for each actor in the database",
    "data": {"message": "submitted, usually an ActivityPub object"},
    "submitter": "requester determined through HTTP signature"
}

by using the processor from bovine_process, one can achieve similar processing behavior as with the default configuration.

from quart import Quart
from bovine_herd import BovineHerd
from bovine_propan.enqueuers import enqueue_to_inbox, enqueue_to_outbox

app = Quart(__name__)

BovineHerd(
    app,
    start_background_task_inbox=enqueue_to_inbox,
    start_background_task_outbox=enqueue_to_outbox,
)

We note here that BovineHerd takes another parameter of similar nature handle_outbox_item. This function does the synchronous processing of an incoming request.

We also note here that the bovine_propan processor takes the result from the process on the routing key inbox in the processing exchange and publishes it to the processed exchange under the routing key given by the eventSource endpoint of the actor. This is the correct thing to consume, if one wants to process the result for a specific actor.

Note

routing keys and exchanges might change, e.g. use bovine_name,inbox instead of a plain inbox to allow actor based differentiation right on arrival.

Starting the broker

We will also need to start the faststream broker used by the two enqueue methods. This can be done by adding the following snippet.

from bovine_propan.enqueuers import enqueue_broker

@app.before_serving
async def start_broker():
    await enqueue_broker.start()


@app.after_serving
async def stop_broker():
    await enqueue_broker.close()

Systemd units

We now need two processes to run bovine_herd. One is the server the second one is a processor.

The processes could look as follows for the processor

/etc/systemd/system/bovine_processor.service
[Unit]
Description=Bovine Processor
After=network.target

[Service]
User=bovine
Group=bovine
Restart=always
Type=simple
WorkingDirectory=/opt/bovine/
Environment="BOVINE_AMQP=amqp://localhost"
ExecStart=/usr/local/bin/poetry run faststream run bovine_propan/__init__:app

[Install]
WantedBy=multi-user.target

and for the server

/etc/systemd/system/bovine.service
[Unit]
Description=Bovine
After=network.target

[Service]
User=bovine
Group=bovine
Restart=always
Type=simple
WorkingDirectory=/opt/bovine/
Environment="BOVINE_AMQP=amqp://localhost"
ExecStart=/usr/local/bin/poetry run hypercorn app:app --bind unix:/tmp/bovine.sock

[Install]
WantedBy=multi-user.target

We note that the details may vary due to deployment consideration.

Complete server

The following code represents a minimal app running bovine_herd with processing through AMQP. The amqp server can be specified via the environment variable BOVINE_AMQP. We note that bovine has only been tested with RabbitMQ so far.

/opt/bovine/app.py
from quart import Quart
from bovine_herd import BovineHerd
from bovine_propan.enqueuers import enqueue_broker, enqueue_to_inbox, enqueue_to_outbox

app = Quart(__name__)

BovineHerd(
    app,
    start_background_task_inbox=enqueue_to_inbox,
    start_background_task_outbox=enqueue_to_outbox,
)


@app.before_serving
async def start_broker():
    await enqueue_broker.start()


@app.after_serving
async def stop_broker():
    await enqueue_broker.close()