Skip to main content
Orchestrate complex workflows by chaining automations using callbacks instead of polling.

Architecture

Your Server                    Optexity
    │                              │
    ├──POST /inference────────────>│
    │                              ├── Execute automation
    │<─────────POST /callback──────┤
    │                              │
    └──Process result, continue────>

Minimal Example

import asyncio
import httpx
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager

callbacks: dict[str, dict] = {}
CALLBACK_TIMEOUT = 600

async def run_workflow(items: list[str]):
    async with httpx.AsyncClient() as client:
        for item in items:
            resp = await client.post(
                "https://inference.optexity.com/api/v1/inference",
                json={
                    "endpoint_name": "my-automation",
                    "input_parameters": {"email": [item]},
                    "unique_parameter_names": ["email"]
                },
            )
            task_id = resp.json()["task_id"]

            callbacks[task_id] = {"event": asyncio.Event(), "data": None}

            await asyncio.wait_for(
                callbacks[task_id]["event"].wait(),
                timeout=CALLBACK_TIMEOUT,
            )

            result = callbacks[task_id]["data"]
            print("Completed:", result)
            callbacks.pop(task_id, None)

@asynccontextmanager
async def lifespan(app: FastAPI):
    task = asyncio.create_task(run_workflow(["a@test.com", "b@test.com"]))
    yield
    task.cancel()

app = FastAPI(lifespan=lifespan)

@app.post("/receive_callback")
async def receive_callback(req: Request):
    payload = await req.json()
    task_id = payload.get("task_id")
    entry = callbacks.get(task_id)
    if entry:
        entry["data"] = payload
        entry["event"].set()
    return {"ok": True}

Dependencies

pip install fastapi uvicorn httpx asyncio

Running

uvicorn main:app --reload --port 4000

How It Works

ComponentPurpose
callbacks dictStores pending tasks with asyncio Events
run_workflowStarts tasks, waits for callbacks
/receive_callbackReceives Optexity notifications, signals completion

Flow

  1. Start task: POST to Optexity inference API
  2. Register callback: Create asyncio Event for task_id
  3. Wait: Event.wait() suspends until callback received
  4. Callback received: Event.set() wakes workflow
  5. Process: Use result, continue to next task

Benefits

BenefitDescription
No pollingTasks complete asynchronously
EfficientLightweight asyncio coordination
ScalableExtend for parallel tasks
ReliableTimeout handling included
Your callback URL must be publicly accessible from Optexity’s servers.