Event Bus Orchestration
Conductor integrates with external messaging systems to enable event-driven workflow orchestration. You can publish events from workflows and react to external events — starting workflows, completing tasks, or failing tasks based on incoming messages.
Supported event buses
| System | Sink prefix | Module | Use case |
|---|---|---|---|
| Kafka | kafka |
kafka |
High-throughput, durable event streaming |
| NATS | nats |
nats |
Lightweight, low-latency messaging |
| NATS Streaming | nats-stream |
nats-streaming |
Durable NATS with replay (legacy) |
| NATS JetStream | nats |
nats |
Modern durable NATS streaming |
| AMQP (RabbitMQ) | amqp, amqp_queue, amqp_exchange |
amqp |
Traditional message queuing with routing |
| SQS | sqs |
sqs |
AWS-native message queuing |
| Conductor | conductor |
built-in | Internal event routing between workflows |
How it works
Event bus orchestration has two sides:
- Publishing — Use the Event task or Kafka Publish task to send messages from a workflow.
- Consuming — Register event handlers that listen for messages and trigger actions.
┌──────────────┐ Event Task ┌──────────────┐ Event Handler ┌──────────────┐
│ Workflow A │ ──────────────────► │ Event Bus │ ──────────────────► │ Workflow B │
│ │ (publish) │ (Kafka/NATS/ │ (start_workflow) │ (triggered) │
│ │ │ AMQP/SQS) │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
Publishing events
Event task
The Event task publishes a message to any supported event bus. The sink parameter determines the target:
{
"name": "notify_downstream",
"taskReferenceName": "notify_ref",
"type": "EVENT",
"sink": "kafka:order-events",
"inputParameters": {
"orderId": "${workflow.input.orderId}",
"status": "PROCESSED"
}
}
Kafka Publish task
For Kafka-specific features (custom headers, key, serializers), use the dedicated Kafka Publish task:
{
"name": "publish_to_kafka",
"taskReferenceName": "kafka_ref",
"type": "KAFKA_PUBLISH",
"inputParameters": {
"kafka_request": {
"topic": "order-events",
"value": "${workflow.input.orderData}",
"bootStrapServers": "kafka:9092",
"headers": {
"X-Correlation-Id": "${workflow.correlationId}"
}
}
}
}
Sink format
The sink parameter follows the format prefix:queue_name:
| Example | System |
|---|---|
kafka:order-events |
Kafka topic order-events |
nats:notifications |
NATS subject notifications |
amqp:task-queue |
AMQP queue task-queue |
amqp_exchange:events |
AMQP exchange events |
sqs:my-queue |
SQS queue my-queue |
conductor |
Conductor internal queue |
conductor:workflow_name:queue_name |
Conductor internal, specific queue |
Consuming events
Event handlers
Event handlers listen for messages on an event bus and execute actions when a matching event arrives. Register them via the /api/event API.
{
"name": "order_event_handler",
"event": "kafka:order-events",
"condition": "$.status == 'PROCESSED'",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "fulfillment_workflow",
"input": {
"orderId": "${orderId}"
}
}
}
]
}
Supported actions
| Action | Description |
|---|---|
start_workflow |
Start a new workflow execution with the event payload as input. |
complete_task |
Complete a waiting task (e.g., a WAIT or HUMAN task) in a running workflow. |
fail_task |
Fail a task in a running workflow. |
Conditions
The condition field supports JavaScript-like expressions evaluated against the event payload:
| Expression | Result |
|---|---|
$.version > 1 |
true if version field > 1 |
$.metadata.codec == 'aac' |
true if nested field matches |
$.status == 'COMPLETED' |
true if status is COMPLETED |
Actions execute only when the condition evaluates to true. If no condition is specified, actions execute for every event.
Patterns
Event-driven workflow chaining
Decouple workflows using events instead of sub-workflows:
{
"name": "order_pipeline",
"tasks": [
{
"name": "process_order",
"taskReferenceName": "process_ref",
"type": "SIMPLE"
},
{
"name": "notify_fulfillment",
"taskReferenceName": "notify_ref",
"type": "EVENT",
"sink": "kafka:fulfillment-requests",
"inputParameters": {
"orderId": "${workflow.input.orderId}",
"items": "${process_ref.output.items}"
}
}
]
}
A separate event handler starts the fulfillment workflow when the event arrives.
Wait for external event
Combine a WAIT task with an event handler to pause a workflow until an external system signals completion:
Register an event handler that completes the task when an approval event arrives:
{
"name": "approval_handler",
"event": "kafka:approval-events",
"condition": "$.approved == true",
"actions": [
{
"action": "complete_task",
"complete_task": {
"workflowId": "${workflowId}",
"taskRefName": "approval_ref",
"output": {
"approvedBy": "${approvedBy}"
}
}
}
]
}
Configuration
Each event bus module requires its own configuration. Enable the modules you need in your Conductor server configuration:
Kafka
NATS
AMQP (RabbitMQ)
conductor.event-queues.amqp.enabled=true
conductor.event-queues.amqp.hosts=rabbitmq
conductor.event-queues.amqp.port=5672
conductor.event-queues.amqp.username=guest
conductor.event-queues.amqp.password=guest
Refer to the module source code for the full set of configuration properties.