Tutorial: Processing requests with AMQP
Bovine uses faststream as a wrapper around
the AMQP details. The implementation details are distributed via bovine_process
in the bovine_propan
package. Furthermore, one has to install bovine_process
with the extra rabbit
.
Note
The details of the above come from the architecture evolving over time, and are also currently likely to change. Depending on how some aspects of bovine get evolved. The same is true for the details of routing keys discussed below.
Using different inbox and outbox tasks
By passing appropriate configuration variables to BovineHerd
, one can change
the asynchronous tasks executed. The two being inserted below just add a message
to an AMQP key. These messages go to the processing
topic exchange with the
routing keys inbox
and outbox
. The message format is
{
"bovine_name": "unique name for each actor in the database",
"data": {"message": "submitted, usually an ActivityPub object"},
"submitter": "requester determined through HTTP signature"
}
by using the processor from bovine_process
, one can achieve similar processing
behavior as with the default configuration.
from quart import Quart
from bovine_herd import BovineHerd
from bovine_propan.enqueuers import enqueue_to_inbox, enqueue_to_outbox
app = Quart(__name__)
BovineHerd(
app,
start_background_task_inbox=enqueue_to_inbox,
start_background_task_outbox=enqueue_to_outbox,
)
We note here that BovineHerd
takes another parameter of similar nature
handle_outbox_item
. This function does the synchronous processing of an
incoming request.
We also note here that the bovine_propan
processor takes the result
from the process on the routing key inbox
in the processing
exchange
and publishes it to the processed
exchange under the routing key given
by the eventSource endpoint of the actor. This is the correct thing to
consume, if one wants to process the result for a specific actor.
Note
routing keys and exchanges might change, e.g. use bovine_name,inbox
instead of a plain inbox to allow actor based differentiation right
on arrival.
Starting the broker
We will also need to start the faststream broker used by the two enqueue methods. This can be done by adding the following snippet.
from bovine_propan.enqueuers import enqueue_broker
@app.before_serving
async def start_broker():
await enqueue_broker.start()
@app.after_serving
async def stop_broker():
await enqueue_broker.close()
Systemd units
We now need two processes to run bovine_herd. One is the server the second one is a processor.
The processes could look as follows for the processor
[Unit]
Description=Bovine Processor
After=network.target
[Service]
User=bovine
Group=bovine
Restart=always
Type=simple
WorkingDirectory=/opt/bovine/
Environment="BOVINE_AMQP=amqp://localhost"
ExecStart=/usr/local/bin/poetry run faststream run bovine_propan/__init__:app
[Install]
WantedBy=multi-user.target
and for the server
[Unit]
Description=Bovine
After=network.target
[Service]
User=bovine
Group=bovine
Restart=always
Type=simple
WorkingDirectory=/opt/bovine/
Environment="BOVINE_AMQP=amqp://localhost"
ExecStart=/usr/local/bin/poetry run hypercorn app:app --bind unix:/tmp/bovine.sock
[Install]
WantedBy=multi-user.target
We note that the details may vary due to deployment consideration.
Complete server
The following code represents a minimal app running bovine_herd with
processing through AMQP. The amqp server can be specified via
the environment variable BOVINE_AMQP
. We note that bovine has only
been tested with RabbitMQ so far.
from quart import Quart
from bovine_herd import BovineHerd
from bovine_propan.enqueuers import enqueue_broker, enqueue_to_inbox, enqueue_to_outbox
app = Quart(__name__)
BovineHerd(
app,
start_background_task_inbox=enqueue_to_inbox,
start_background_task_outbox=enqueue_to_outbox,
)
@app.before_serving
async def start_broker():
await enqueue_broker.start()
@app.after_serving
async def stop_broker():
await enqueue_broker.close()