Skip to content

bovine_herd.server.endpoints

build_endpoints_blueprint(handle_outbox_item, start_background_task_inbox, start_background_task_outbox)

Creates a blueprint that handles the HTTP requests for ActivityPub

Parameters:

Name Type Description Default
handle_outbox_item Callable[[ProcessingItem, BovineStoreActor], Awaitable]

Invoke synchronously on a call to the outbox endpoint

required
start_background_task_inbox Callable[[ProcessingItem, BovineStoreActor], Awaitable]

Schedules asynchronous inbox processing

required
start_background_task_outbox Callable[[ProcessingItem, BovineStoreActor], Awaitable]

Schedules asynchronous outbox processing

required
Source code in bovine_herd/bovine_herd/server/endpoints.py
def build_endpoints_blueprint(
    handle_outbox_item: Callable[[ProcessingItem, BovineStoreActor], Awaitable],
    start_background_task_inbox: Callable[
        [ProcessingItem, BovineStoreActor], Awaitable
    ],
    start_background_task_outbox: Callable[
        [ProcessingItem, BovineStoreActor], Awaitable
    ],
):
    """Creates a blueprint that handles the HTTP requests for ActivityPub

    :param handle_outbox_item: Invoke synchronously on a call to the outbox endpoint
    :param start_background_task_inbox: Schedules asynchronous inbox processing
    :param start_background_task_outbox: Schedules asynchronous outbox processing
    """

    endpoints = Blueprint("endpoint", __name__)

    @endpoints.get("/<identifier>")
    async def endpoints_get(identifier):
        endpoint_path = path_from_request(request)
        object_store = current_app.config["bovine_store"]

        endpoint_type, actor = await object_store.resolve_endpoint_no_cache(
            endpoint_path
        )

        if endpoint_type is None:
            return (
                {
                    "@context": "https://www.w3.org/ns/activitystreams",
                    "type": "Object",
                    "id": endpoint_path,
                    "name": "Ceci n'est pas un object",
                },
                404,
                {"content-type": "application/activity+json"},
            )

        if endpoint_type == EndpointType.ACTOR:
            logger.info("Getting %s for %s", endpoint_path, g.retriever)

            if endpoint_path == g.retriever:
                return (
                    actor.actor_object.build(visibility=Visibility.OWNER),
                    200,
                    {"content-type": "application/activity+json"},
                )

            if g.retriever is None or g.retriever == "NONE":
                if "text" in request.headers.get("accept", ""):
                    if actor.actor_object.url is not None:
                        return redirect(actor.actor_object.url)
                return {"status": "unauthorized"}, 401

            return (
                actor.actor_object.build(visibility=Visibility.PUBLIC),
                200,
                {"content-type": "application/activity+json"},
            )

        if endpoint_type == EndpointType.EVENT_SOURCE:
            actor = actor.actor_object.build(visibility=Visibility.OWNER)
            if g.retriever != actor["id"]:
                return {"status": "unauthorized"}, 401
            return await handle_event_source(endpoint_path, actor)

        return await collection_response(endpoint_path)

    @endpoints.post("/<identifier>")
    async def endpoints_post(identifier):
        if not g.retriever:
            return {"status": "unauthorized"}, 401

        endpoint_path = path_from_request(request)
        object_store = current_app.config["bovine_store"]

        endpoint_type, actor = await object_store.resolve_endpoint(endpoint_path)

        if endpoint_type not in [
            EndpointType.INBOX,
            EndpointType.OUTBOX,
            EndpointType.PROXY_URL,
        ]:
            return {"status": "method not allowed"}, 405

        if endpoint_type == EndpointType.INBOX:
            item = ProcessingItem(g.retriever, await request.get_json())
            await start_background_task_inbox(item, actor)

            return {"status": "processing"}, 202

        if g.retriever != actor.actor_object.id:
            return {"status": "unauthorized"}, 401

        if endpoint_type == EndpointType.OUTBOX:
            item = ProcessingItem(g.retriever, await request.get_json())
            item = await handle_outbox_item(item, actor)
            await start_background_task_outbox(item, actor)

            return (
                {"status": "created"},
                201,
                {"location": item.meta.get("object_location")},
            )

        if endpoint_type == EndpointType.PROXY_URL:
            return await proxy_url_response(actor)

    return endpoints