Dynamic workflows in code
Workflow as code
Conductor supports a code-first workflow approach — build workflows programmatically using the Python SDK instead of writing JSON by hand. This workflow as code pattern lets you chain tasks with the >> operator, add conditional logic, loops, and parallel branches — all in Python. Code-first workflows are ideal for dynamic workflows where the task graph is determined at runtime.
Simple sequential workflow
Chain tasks with the >> operator. Worker functions decorated with @worker_task become reusable task building blocks.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.worker.worker_task import worker_task
@worker_task(task_definition_name='fetch_order')
def fetch_order(order_id: str) -> dict:
return {'order_id': order_id, 'amount': 99.99, 'item': 'Widget'}
@worker_task(task_definition_name='process_payment')
def process_payment(order_id: str, amount: float) -> dict:
return {'transaction_id': 'txn_abc123', 'status': 'charged'}
@worker_task(task_definition_name='ship_order')
def ship_order(order_id: str, transaction_id: str) -> dict:
return {'tracking': 'TRACK-456', 'carrier': 'FedEx'}
workflow = ConductorWorkflow(name='order_fulfillment', version=1, executor=executor)
fetch = fetch_order(task_ref_name='fetch', order_id=workflow.input('order_id'))
pay = process_payment(
task_ref_name='pay',
order_id=workflow.input('order_id'),
amount=fetch.output('amount'),
)
ship = ship_order(
task_ref_name='ship',
order_id=workflow.input('order_id'),
transaction_id=pay.output('transaction_id'),
)
workflow >> fetch >> pay >> ship
workflow.output_parameters({
'tracking': ship.output('tracking'),
'transaction_id': pay.output('transaction_id'),
})
workflow.register(overwrite=True)
Conditional branching with Switch
Route execution based on task output or workflow input. Each case gets its own task chain.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.switch_task import SwitchTask
workflow = ConductorWorkflow(name='route_by_priority', version=1, executor=executor)
classify = classify_ticket(
task_ref_name='classify',
description=workflow.input('description'),
)
switch = SwitchTask(task_ref_name='priority_router', case_expression=classify.output('priority'))
# Each case is a list of tasks to execute
switch.switch_case('critical', [
page_oncall(task_ref_name='page', ticket_id=workflow.input('ticket_id')),
escalate(task_ref_name='escalate', ticket_id=workflow.input('ticket_id')),
])
switch.switch_case('high', [
assign_senior(task_ref_name='assign', ticket_id=workflow.input('ticket_id')),
])
switch.default_case([
add_to_backlog(task_ref_name='backlog', ticket_id=workflow.input('ticket_id')),
])
workflow >> classify >> switch
workflow.register(overwrite=True)
Parallel execution with Fork/Join
Run independent tasks in parallel and wait for all to complete.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.fork_task import ForkTask
from conductor.client.workflow.task.join_task import JoinTask
workflow = ConductorWorkflow(name='parallel_enrichment', version=1, executor=executor)
# Define independent tasks
credit_check = check_credit(task_ref_name='credit', customer_id=workflow.input('customer_id'))
fraud_check = check_fraud(task_ref_name='fraud', customer_id=workflow.input('customer_id'))
kyc_check = check_kyc(task_ref_name='kyc', customer_id=workflow.input('customer_id'))
# Fork runs all branches in parallel
fork = ForkTask(
task_ref_name='parallel_checks',
forked_tasks=[
[credit_check],
[fraud_check],
[kyc_check],
],
)
# Join waits for all branches
join = JoinTask(task_ref_name='wait_all', join_on=['credit', 'fraud', 'kyc'])
# Merge results
decide = make_decision(
task_ref_name='decide',
credit_score=credit_check.output('score'),
fraud_risk=fraud_check.output('risk_level'),
kyc_status=kyc_check.output('status'),
)
workflow >> fork >> join >> decide
workflow.output_parameters({'decision': decide.output('result')})
workflow.register(overwrite=True)
Loops with Do/While
Repeat a set of tasks until a condition is met — useful for polling, retries, or iterative AI agent loops.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.do_while_task import DoWhileTask
workflow = ConductorWorkflow(name='agent_loop', version=1, executor=executor)
# The task(s) to repeat each iteration
think = call_llm(
task_ref_name='think',
prompt=workflow.input('goal'),
)
act = execute_tool(
task_ref_name='act',
tool=think.output('tool'),
args=think.output('args'),
)
# Loop until the LLM says it's done (max 10 iterations)
loop = DoWhileTask(
task_ref_name='agent_loop',
termination_condition='if ($.act["output"]["done"] == true) { false; } else { true; }',
tasks=[think, act],
)
loop.input_parameters.update({'max_iterations': 10})
summarize = summarize_results(task_ref_name='summarize', results=act.output('results'))
workflow >> loop >> summarize
workflow.register(overwrite=True)
HTTP + system tasks mixed with workers
Combine built-in system tasks (HTTP, Wait, JQ Transform) with custom workers — no extra deployment needed for system tasks.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.json_jq_task import JsonJQTask
from conductor.client.workflow.task.wait_task import WaitTask
workflow = ConductorWorkflow(name='data_pipeline', version=1, executor=executor)
# HTTP task — fetch data from an external API (no worker needed)
fetch = HttpTask(task_ref_name='fetch_data', http_input={
'uri': 'https://api.example.com/records',
'method': 'GET',
'headers': {'Authorization': ['Bearer ${workflow.input.api_key}']},
})
# JQ Transform — reshape the response (no worker needed)
transform = JsonJQTask(
task_ref_name='transform',
script='.body.records | map({id: .id, value: .metrics.total})',
)
transform.input_parameters.update({
'records': fetch.output('response.body'),
})
# Custom worker — run business logic
enrich = enrich_records(
task_ref_name='enrich',
records=transform.output('result'),
)
# Wait — pause for 5 seconds before the next step
cooldown = WaitTask(task_ref_name='cooldown', wait_for_seconds=5)
# Custom worker — store results
store = save_to_database(task_ref_name='store', records=enrich.output('enriched'))
workflow >> fetch >> transform >> enrich >> cooldown >> store
workflow.output_parameters({'stored': store.output('count')})
workflow.register(overwrite=True)
Sub-workflows
Break large workflows into reusable pieces. A parent workflow invokes child workflows as tasks.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.sub_workflow_task import SubWorkflowTask
# Child workflow (registered separately)
child = ConductorWorkflow(name='process_single_item', version=1, executor=executor)
validate = validate_item(task_ref_name='validate', item=child.input('item'))
transform = transform_item(task_ref_name='transform', item=validate.output('validated'))
child >> validate >> transform
child.output_parameters({'result': transform.output('transformed')})
child.register(overwrite=True)
# Parent workflow invokes the child
parent = ConductorWorkflow(name='batch_processor', version=1, executor=executor)
prepare = prepare_batch(task_ref_name='prepare', batch_id=parent.input('batch_id'))
run_child = SubWorkflowTask(
task_ref_name='process_item',
workflow_name='process_single_item',
version=1,
)
run_child.input_parameters.update({'item': prepare.output('first_item')})
aggregate = aggregate_results(
task_ref_name='aggregate',
result=run_child.output('result'),
)
parent >> prepare >> run_child >> aggregate
parent.register(overwrite=True)
Runtime-generated dynamic workflow
Build a workflow definition at runtime and execute it without pre-registration. This runtime workflow pattern enables dynamic workflows where the task graph is generated on-the-fly — useful for AI agents, data pipelines, and any scenario where the steps are not known ahead of time.
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
from conductor.client.http.models import StartWorkflowRequest
config = Configuration()
clients = OrkesClients(configuration=config)
executor = clients.get_workflow_executor()
# Build the workflow definition dynamically
steps = ['validate', 'enrich', 'store'] # determined at runtime
tasks = []
for i, step in enumerate(steps):
tasks.append({
'name': step,
'taskReferenceName': f'{step}_{i}',
'type': 'SIMPLE',
'inputParameters': {
'data': '${workflow.input.data}' if i == 0 else f'${{{steps[i-1]}_{i-1}.output.result}}',
},
})
# Start with inline definition — no pre-registration needed
request = StartWorkflowRequest(
name='dynamic_pipeline',
workflow_def={
'name': 'dynamic_pipeline',
'version': 1,
'tasks': tasks,
'outputParameters': {
'result': f'${{{steps[-1]}_{len(steps)-1}.output.result}}',
},
},
input={'data': {'key': 'value'}},
)
workflow_id = executor.start_workflow(request)
print(f'Started dynamic workflow: {workflow_id}')
This pattern is powerful for AI agents that generate execution plans at runtime — the LLM produces the list of steps, your code builds the workflow definition, and Conductor executes it with full durability, retries, and observability.
Execute and wait for result
Run a workflow synchronously and get the result inline — useful for APIs and interactive applications.
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
config = Configuration()
clients = OrkesClients(configuration=config)
executor = clients.get_workflow_executor()
# Execute synchronously — blocks until the workflow completes
run = executor.execute(
name='order_fulfillment',
version=1,
workflow_input={'order_id': 'ORD-789'},
)
print(f'Status: {run.status}')
print(f'Output: {run.output}')
print(f'View: {config.ui_host}/execution/{run.workflow_id}')
Setup
All examples above assume a WorkflowExecutor instance. Here is the standard setup:
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
config = Configuration() # reads CONDUCTOR_SERVER_URL from env
clients = OrkesClients(configuration=config)
executor = clients.get_workflow_executor()
For more Python SDK examples, see the Python SDK documentation and the examples on GitHub.