Celery engine
The Celery engine dispatches every task in a graph as an individual Celery task while preserving provenance in the active AiiDA profile. Use it when you already operate a Celery deployment and want the flexibility of scaling workers independently from the workflow orchestrator.
Installation
Install the engine extra together with an AiiDA profile. The extra pulls in Celery,
kombu, and the redis client so the adaptor can register and submit tasks using the
Redis broker referenced throughout this guide:
pip install node_graph_engine[celery]
If you plan to use a different transport (for example RabbitMQ), install the matching client library for that broker instead of Redis.
Install and start a Redis server so the broker and result backend URLs below resolve to a
running service. The exact command depends on your platform (for example
sudo apt install redis-server on Debian-based Linux or brew install redis on
macOS) and you may need to enable the service after installation.
Create (or reuse) an AiiDA profile for provenance storage before running any graphs. The verdi presto helper remains the fastest way to bootstrap a local profile.
Configuration
By default the engine spins up a lightweight Celery application that uses the
memory:// broker and rpc:// result backend. This configuration keeps execution
local to the current process and forces Celery into eager mode so graphs complete without
an external worker. Eager execution is useful for development and unit tests, but Celery
will only execute one task at a time inside the current Python interpreter.
Provide a broker URL, result backend, or a pre-configured Celery
instance to connect to an existing deployment:
from node_graph_engine.engines.celery import CeleryEngine
engine = CeleryEngine(
"materials-queue",
broker_url="redis://localhost:6379/0",
backend_url="redis://localhost:6379/1",
)
Note
When using RabbitMQ as the broker, a typical URL looks like this:
broker_url="amqp://guest:guest@127.0.0.1:5672"
When the broker is anything other than memory:// the engine leaves eager mode
disabled so tasks are published to the external queue. You can still override the
behaviour explicitly through the always_eager keyword argument if you want to force a
particular execution model.
Running workers
Parallel execution requires at least one Celery worker that subscribes to the same broker
URL as the engine. The snippet below creates a small celery_worker.py module that
instantiates the engine, exposes the underlying application, and can be launched with the
standard celery CLI:
# celery_worker.py
from node_graph_engine.engines.celery import CeleryEngine
engine = CeleryEngine(
"celery-flow",
broker_url="redis://localhost:6379/0",
backend_url="redis://localhost:6379/1",
)
celery_app = engine.celery_app
Start a worker from a shell in the same environment:
celery -A celery_worker.celery_app worker --loglevel=INFO
With the worker running, launch your workflow from another terminal. Each independent task will be dispatched to the external queue and processed by the worker, allowing them to execute concurrently.
Example
The example below mirrors the quick-start workflow. Leave always_eager at its default
value so work is routed to the broker configured above:
from aiida import load_profile
from node_graph import task
from node_graph_engine.engines.celery import CeleryEngine
load_profile()
@task()
def add(x, y):
return x + y
@task()
def multiply(x, y):
return x * y
@task.graph()
def add_then_multiply(x, y, z):
the_sum = add(x=x, y=y).result
return multiply(x=the_sum, y=z).result
graph = add_then_multiply.build(x=1, y=2, z=3)
engine = CeleryEngine(
"celery-flow",
broker_url="redis://localhost:6379/0",
backend_url="redis://localhost:6379/1",
)
outputs = engine.run(graph)
print(outputs)
The engine records all task executions in AiiDA and will reuse Celery workers that are already subscribed to the configured broker. Use Celery’s standard monitoring tools (such as celery -A myapp inspect active and flower) to observe queued and running task jobs.