Prefect engine
The Prefect adaptor converts graphs into Prefect 3 flows so that you can execute them with Prefect’s task runner infrastructure while streaming provenance back to AiiDA.
Installation
Install the Prefect extra for Graph Engine. This pulls in
prefectand the adaptor dependencies:pip install node_graph_engine[prefect]Start a local Prefect 3 server:
prefect server startLeave this running while you create deployments or serve flows. The Prefect UI will be available on the printed URL (default http://127.0.0.1:4200). Or, you can set up the Prefect Cloud.
Example
The example below mirrors the quick-start computation but runs it through Prefect. The engine returns resolved values; Prefect task futures are handled internally.
from aiida import load_profile
from node_graph import task
from node_graph_engine.engines.prefect import PrefectEngine
load_profile()
@task()
def add(x, y):
return x + y
@task()
def multiply(x, y):
return x * y
@task.graph()
def add_then_multiply(x, y, z):
the_sum = add(x=x, y=y).result
return multiply(x=the_sum, y=z).result
graph = add_then_multiply.build(x=1, y=2, z=3)
engine = PrefectEngine(flow_name="quick-start")
outputs = engine.run(graph)
print(outputs)
Inspect the Prefect UI for task-level details.
Use AiiDA commands to inspect the processes and their provenance:
verdi process list -a
Which will show something like:
2199 3m ago NodeGraph<add_then_multiply> ⏹ Finished [0]
2200 3m ago add ⏹ Finished [0]
2202 3m ago multiply ⏹ Finished [0]
Then generate a provenance graph for a workflow:
verdi node graph generate 2199
Here is the resulting graph:
Serving to a work pool (remote workers)
PrefectEngine can also produce a servable Prefect 3 flow that you register to a work
pool so remote workers execute your Graph while AiiDA provenance is captured
identically. Create a work pool to which workers can connect:
prefect work-pool create my-process-pool --type process
Start a worker on the remote machine (with the same Python environment and AiiDA profile available) to pick up submitted runs:
prefect worker start --pool my-process-pool
Each scheduled run reconstructs the Graph on the worker, executes the tasks through Prefect, and emits byte-identical AiiDA provenance regardless of where the worker runs.
Save the following as e.g. flow.py in a git repository:
from aiida import load_profile
from prefect import flow
from node_graph_engine.engines.prefect import PrefectEngine
load_profile()
graph = add_then_multiply.build(x=1, y=2, z=3)
engine = PrefectEngine(flow_name="ng-eos")
@flow(name="add-multiply-deployment")
def run_ng():
return engine.run(graph)
And then deploy it from your local machine:
from prefect import flow
if __name__ == "__main__":
flow.from_source(
source="https://github.com/superstar54/test-prefect.git",
entrypoint="flow.py:run_ng",
).deploy(
name="run-ng-from-thinkpad",
work_pool_name="my-process-pool",
)
Note
Ensure that the remote environment has access to the same AiiDA profile and database as the local machine to maintain consistent provenance tracking.