Dynamic Jobs: Iterative Workflows with spawn_jobs
This tutorial teaches you how to build a torc workflow whose iteration count is decided at
runtime — for example a numerical refinement that keeps going until it converges. The shape can't
be expressed as a static DAG, so torc gives you one primitive to extend the workflow as it runs:
spawn_jobs.
Learning Objectives
By the end of this tutorial, you will:
- Recognize when a static DAG can't express your workflow
- Write a lightweight orchestrator job that adds the next iteration at runtime
- Build a converging loop and watch it stop on its own
- Run multiple independent iterations concurrently in one workflow
- Know how to cap a runaway loop
Prerequisites
- Completed Tutorial 2: Diamond Workflow
- Torc server running (see Quick Start (Local))
- The Python client installed:
pip install torc-client jqandawkavailable onPATH(used by the demo worker)- Roughly 15 minutes
The Problem a Static DAG Can't Express
Most torc workflows are a fixed DAG declared up front: "run these N jobs in this order." That works when the work is known.
Suppose you have a simulation that:
- Refines its estimate each iteration
- Converges when the change between iterations drops below a tolerance
- Might take 3 iterations or 30 — it depends on the data
Three iterations of (worker → check → worker) you could write by hand. Thirty you can't, because you don't know it's thirty until you've already started.
A Naive Workaround That Doesn't Work
Pre-allocate the maximum and cancel the rest. Declare 30 worker jobs in a chain and have the converging iteration cancel the remaining 27.
This is wasteful (the unused jobs still occupy slots in the dependency graph), fragile (cancellation has to live in your scripts), and pushes discovery into user code. For 10 concurrent runs of this pattern you'd be staring at 300 jobs that mostly won't run, and a debugging story that gets worse with every retry.
There's a better primitive.
The Spawn Pattern
A lightweight orchestrator job does three things:
- Inspects the previous iteration's output (cheap — just reads a file or a small
user_datarecord) - Calls
spawn_jobsto add the next worker plus a continuation of itself, both blocked on this orchestrator - Exits 0
The torc runner completes the orchestrator on exit (the normal path). The spawned jobs were inserted
blocked with an implicit dependency edge on the calling orchestrator, so the unblock cascade
promotes them as soon as the orchestrator becomes terminal. The next orchestrator generation then
does the same thing. Convergence is "spawn nothing." When the orchestrator decides it's done, it
calls spawn_jobs with jobs=[] (optionally with a final state payload) and exits. With no further
jobs in the queue, the workflow finishes naturally.
What spawn_jobs Does in One Call
In one database transaction, the server:
- Inserts each requested job as
Blocked - Adds an implicit
job_depends_onedge to the calling orchestrator (you don't list it yourself) - Resolves explicit
depends_onagainst existing jobs and against siblings in the same batch - Persists an opaque JSON
statepayload as an immutable per-generation record
If anything in the batch is invalid — unknown resource_requirements, a dependency cycle, the cap
exceeded — the whole transaction aborts and nothing is persisted.
Step 1: Write the Worker
The "real work" of each iteration. For this tutorial we'll simulate convergence: each iteration's metric is half the previous one, so it crosses our tolerance after 7 generations.
Save as worker.sh:
#!/usr/bin/env bash
set -euo pipefail
lineage="$1"
gen="$2"
work_dir="./work/$lineage"
mkdir -p "$work_dir"
prev_file="$work_dir/worker_i$(printf '%02d' $((10#$gen - 1))).json"
prior_metric=$(jq -r '.metric' "$prev_file" 2>/dev/null || echo "1.0")
new_metric=$(awk "BEGIN { print $prior_metric / 2 }")
out_file="$work_dir/worker_i$(printf '%02d' "$gen").json"
printf '{"metric": %s}\n' "$new_metric" > "$out_file"
echo "iter $gen: $prior_metric -> $new_metric" >&2
Step 2: Write the Orchestrator
The orchestrator reads the prior worker's output, decides whether to continue, and either spawns the next iteration or stops.
Save as orchestrator.py:
#!/usr/bin/env python3
import json, os, sys
from pathlib import Path
from torc import Orchestrator, SpawnJobModel
CONVERGENCE_TOLERANCE = 0.01
# `Orchestrator.from_env()` reads TORC_API_URL / TORC_WORKFLOW_ID / TORC_JOB_ID
# from the env and resolves the lineage from TORC_ORCHESTRATOR_LINEAGE_ID
# (set by torc on every spawned continuation), falling back to `sys.argv[1]`
# on the seed invocation.
orch = Orchestrator.from_env(lineage_fallback=sys.argv[1] if len(sys.argv) > 1 else None)
current_gen = orch.generation # 0 on the seed, derived from torc's lineage records
next_gen = current_gen + 1
# Read prior worker's metric (None on the seed generation).
prior_file = Path(f"./work/{orch.lineage}/worker_i{current_gen:02d}.json")
prior_metric = json.loads(prior_file.read_text())["metric"] if prior_file.exists() else None
print(f"[orch {orch.lineage}] current_gen={current_gen} prior_metric={prior_metric}", file=sys.stderr)
# Convergence: spawn nothing, write a final state record, exit.
if prior_metric is not None and prior_metric < CONVERGENCE_TOLERANCE:
orch.converge(state={
"converged": True, "final_metric": prior_metric, "iterations": current_gen,
})
print(f"[orch {orch.lineage}] converged at gen={current_gen}", file=sys.stderr)
sys.exit(0)
# Otherwise spawn the next worker + the next orchestrator continuation.
worker = f"worker_{orch.lineage}_i{next_gen:02d}"
cont = f"orch_{orch.lineage}_g{next_gen:02d}"
orch.spawn(
jobs=[
SpawnJobModel(
name=worker,
command=f"bash {os.path.abspath('worker.sh')} {orch.lineage} {next_gen}",
resource_requirements="worker_rr",
),
SpawnJobModel(
name=cont,
command=f"python3 {os.path.abspath('orchestrator.py')}",
resource_requirements="orch_rr",
depends_on=[worker],
cancel_on_blocking_job_failure=False,
),
],
state={"generation": next_gen, "prior_metric": prior_metric},
)
print(f"[orch {orch.lineage}] spawned gen={next_gen}: {worker} -> {cont}", file=sys.stderr)
Notice what the orchestrator does not do:
- It does not list itself in any spawned job's
depends_on. The server adds that edge. - It does not need to know how torc encodes per-lineage iteration state.
Orchestrator.generationwalks the__torc_lineage__<lineage>__g######user_datarecords on its behalf.
Step 3: Write the Workflow Spec
Save as iterative.yaml:
name: iterative_refinement
description: Iterative loop that converges via spawn_jobs
dynamic_jobs:
max_iterations: 20 # safety cap; 7 are expected
resource_requirements:
- name: worker_rr
num_cpus: 1
memory: 256m
- name: orch_rr
num_cpus: 1
memory: 128m
jobs:
- name: orch_caseA_g00
command: python3 orchestrator.py caseA
resource_requirements: orch_rr
The workflow declares one job — the seed orchestrator. Every worker and every continuation is
added at runtime by spawn_jobs.
Step 4: Run It
torc run iterative.yaml
The seed orchestrator runs, sees no prior worker, and spawns worker_caseA_i01 plus
orch_caseA_g01. The runner completes the seed; the unblock cascade promotes the worker; the worker
writes its metric; the next orchestrator unblocks, reads the metric, and spawns the next pair. This
repeats until the metric drops below 0.01 (around generation 7).
Step 5: Watch It Happen
While the workflow runs, list jobs at any time to watch new ones appear. Initially the table shows
only the seed (orch_caseA_g00); after a few seconds worker_caseA_i01 and orch_caseA_g01 join
it; and so on until convergence at generation 7.
torc jobs list <workflow_id>
The default table prints ID | Name | Status | Priority | Command — useful for tracking progress
but not for telling spec-declared jobs apart from spawned ones. For that, dump JSON and inspect each
job's origin field ("spawn" for jobs added by spawn_jobs, absent for jobs declared in the
workflow spec):
torc -f json jobs list <workflow_id> \
| jq -r '.items[] | [.name, .status, (.origin // "(declared)")] | @tsv'
orch_caseA_g00 completed (declared)
worker_caseA_i01 completed spawn
orch_caseA_g01 completed spawn
worker_caseA_i02 completed spawn
orch_caseA_g02 completed spawn
...
worker_caseA_i07 completed spawn
orch_caseA_g07 completed spawn
Things to notice:
- The seed orchestrator has no
originset (declared at workflow creation, so the field is omitted from the JSON). Every other job hasorigin = "spawn". - The orchestrators in the middle never produced output of their own — they just inspected the prior metric and re-spawned.
- The last orchestrator generation called
spawn_jobswithjobs=[]and the workflow finished naturally with no leftover Ready or Blocked jobs.
To inspect the final state record:
torc user-data list <workflow_id> | grep __torc_lineage__caseA__final
It holds {"converged": true, "final_metric": ..., "iterations": 7} — the payload your orchestrator
sent on the converging call.
Step 6: Run Multiple Concurrent Lineages
What if you want 10 independent runs of this same loop? Add 10 seed orchestrators:
jobs:
- name: orch_caseA_g00
command: python3 orchestrator.py caseA
resource_requirements: orch_rr
- name: orch_caseB_g00
command: python3 orchestrator.py caseB
resource_requirements: orch_rr
# ... add caseC ... caseJ ...
Each runs its own independent lineage. Their per-lineage state records (__torc_lineage__caseA__*,
__torc_lineage__caseB__*, …) and iteration counters do not interact. Torc's resource packer
interleaves all of their workers across compute nodes; the orchestrators are cheap and don't crowd
anything.
The max_iterations: 20 cap is per lineage, not per workflow. Case A iterating 20 times has no
effect on Case B's allowance.
Safety: the Iteration Cap
dynamic_jobs.max_iterations is a runaway guard. The first attempted iteration that would push a
lineage past the cap is rejected with HTTP 422:
Per-lineage iteration cap reached for lineage 'caseA':
attempted iteration 21 exceeds dynamic_jobs.max_iterations=20 (current spawn_count=20)
When the cap rejects a call, nothing is persisted, the orchestrator stays Running, and you can
investigate or raise the cap on the next run. Omit the field to use the server default (1000).
The server also validates the cap at workflow-creation time: max_iterations: 0 or negative values
are rejected up front with 422 — they would silently disable spawning otherwise.
What About Slurm?
A spawned job is a normal job from Slurm's perspective: it sits Ready until a compute node with
matching resources claims it. If the workflow was submitted with an allocation sized only for the
seed orchestrators, a spawned worker may have no fitting node.
The fix is the same as for failure-handler retries — run torc watch alongside the workflow with
--auto-schedule:
torc watch --auto-schedule <workflow_id>
The watch loop counts ready jobs with origin IS NOT NULL (spawned + retried) and asks
regenerate_and_submit to mint a Slurm allocation sized for the currently-pending resource shapes
when needed.
Reference
Request Body
{
"lineage": "caseA",
"jobs": [
{
"name": "worker_caseA_i01",
"command": "bash worker.sh caseA 1",
"resource_requirements": "worker_rr"
},
{
"name": "orch_caseA_g01",
"command": "python3 orchestrator.py",
"resource_requirements": "orch_rr",
"depends_on": ["worker_caseA_i01"],
"cancel_on_blocking_job_failure": false
}
],
"state": { "generation": 1 }
}
Rules
| Rule | Why |
|---|---|
| Every spawned job is auto-blocked on the calling orchestrator | Without this, children would race the runner's completion of the caller; you'd have to coordinate it yourself |
depends_on may reference existing jobs or siblings in the same batch | Lets one spawn call describe a small DAG (worker → next-orchestrator) |
| The sibling subgraph must be acyclic | Rejected 422; the same dependency rule as static jobs |
resource_requirements must name a record declared at workflow creation | Falls back to the workflow's default RR if omitted, same as create_job |
| Replays (same names already exist) are idempotent no-ops | Lets you safely re-run an orchestrator that crashed after committing its spawn — no duplicate jobs, no double-counted iteration |
Convergence is jobs=[], optionally with a final state payload | No separate "stop" endpoint; the workflow finishes naturally once nothing else is pending |
job.origin is 'spawn' for spawned jobs, 'retry' for retries, NULL for declared | Lets torc watch --auto-schedule recognize jobs that need an unplanned Slurm allocation; lets reports separate static plan from dynamic add |
What Stays the Same as Static Jobs
- Status lifecycle (
Blocked → Ready → Running → Completed/Failed) - Resource claiming and packing
- Restart and reset semantics (a row created by
spawn_jobsresets the same way a declared job does) - File and
user_datadependencies - Failure handlers and
cancel_on_blocking_job_failure
Next Steps
- The energy-modeling example under
examples/is the same pattern applied to a ReEDS↔PRAS feedback loop. The orchestrator script isexamples/scripts/dynamic_orchestrator.py; two yaml variants share it:examples/yaml/dynamic_orchestrator_local.yaml— laptop-scale resources, runs undertorc run.examples/yaml/dynamic_orchestrator_slurm.yaml— realistic HPC resource shapes (ReEDS at 8 CPU / 10 GB, PRAS at 32 CPU / 120 GB), submitted withtorc submit.