Workflow Message Queue (WMQ)
tl;dr — every workflow now has a queue. You can use this queue to turn your workflow into an event loop: it sits idle, waiting for messages, processes each one, then goes back to waiting.
How it works
WMQ adds a persistent message queue to every running Conductor workflow. While the workflow is active you can push messages to it from anywhere — another service, a Kafka consumer, a webhook handler, a human — and the workflow will pick them up and act on them.
Two pieces make this work:
POST /api/workflow/{workflowId}/messages— an HTTP endpoint exposed by Conductor that accepts a JSON payload and enqueues it on the workflow's queue.PULL_WORKFLOW_MESSAGES— a new Conductor system task that blocks until messages arrive, then completes withoutput.messagescontaining the batch.
Prerequisites
WMQ requires changes that are currently in review:
| Component | PR |
|---|---|
| Conductor OSS | https://github.com/conductor-oss/conductor/pull/917 |
Python SDK (conductor-python) |
https://github.com/conductor-oss/python-sdk/pull/389 |
Using WMQ
Add a PULL_WORKFLOW_MESSAGES task to your workflow definition:
{
"name": "wait_for_message",
"taskReferenceName": "wait_for_message_ref",
"type": "PULL_WORKFLOW_MESSAGES",
"inputParameters": {
"batchSize": 1
}
}
Then push to it:
curl -X POST http://localhost:8080/api/workflow/{workflowId}/messages \
-H "Content-Type: application/json" \
-d '{"text": "hello"}'
The task completes with:
{
"messages": [
{
"id": "3f2504e0-4f89-11d3-9a0c-0305e82c3301",
"workflowId": "8e2c14e1-...",
"payload": { "text": "hello" },
"receivedAt": "2025-06-15T10:30:00Z"
}
],
"count": 1
}
Your workflow accesses the user data via output.messages[0].payload. The id and receivedAt fields are added by Conductor at ingestion time.
Push errors:
- 409 Conflict — workflow is not in RUNNING state (completed, failed, terminated, etc.). The message is not stored.
- 500 — queue is full (maxQueueSize reached). Caller must back off and retry.
Event loop pattern
For workflows that process an unbounded stream of messages, wrap the task in a DO_WHILE:
{
"name": "message_loop",
"taskReferenceName": "message_loop_ref",
"type": "DO_WHILE",
"loopCondition": "$.message_loop_ref['iteration'] < 100",
"loopOver": [
{
"name": "pull_message",
"taskReferenceName": "pull_message_ref",
"type": "PULL_WORKFLOW_MESSAGES",
"inputParameters": { "batchSize": 1 }
},
{
"name": "process_message",
"taskReferenceName": "process_message_ref",
"type": "INLINE",
"inputParameters": {
"evaluatorType": "javascript",
"expression": "function e() { return { payload: $.messages[0].payload }; } e();",
"messages": "${pull_message_ref.output.messages}"
}
}
]
}
The loop parks on PULL_WORKFLOW_MESSAGES until the next message arrives.
Using WMQ with Agentspan
Agentspan wraps WMQ behind wait_for_message_tool and runtime.send_message(). See https://github.com/agentspan/agentspan/pull/23.
Define a message-waiting tool
from agentspan.agents import Agent, wait_for_message_tool
inbox = wait_for_message_tool(
name="wait_for_message",
description="Wait for the next incoming message.",
)
agent = Agent(
name="my-agent",
model="openai/gpt-4o",
tools=[inbox],
system_prompt="You are a message processing agent. Wait for messages and process them one by one.",
)
When the agent calls this tool the runtime emits a WAITING event, the workflow parks on a PULL_WORKFLOW_MESSAGES task, and nothing runs until a message arrives.
Send a message to the running agent
with AgentRuntime() as runtime:
handle = runtime.start(agent, "Start processing messages.")
# from anywhere, at any time:
runtime.send_message(handle.workflow_id, {"text": "hello"})
send_message POSTs the payload to /api/workflow/{workflowId}/messages. The workflow unblocks, the agent sees the message as a tool result, and the loop continues.
Kafka bridge example
The pattern also works as a bridge from external event streams.
Run the agent (it runs as a workflow in Conductor), then send messages from a Kafka consumer:
with AgentRuntime() as runtime:
handle = runtime.start(agent, "Start consuming messages from Kafka.")
consumer = Consumer({...})
consumer.subscribe([KAFKA_TOPIC])
while True:
msg = consumer.poll(timeout=1.0)
if msg:
runtime.send_message(handle.workflow_id, {
"topic": msg.topic(),
"value": msg.value().decode("utf-8"),
})
Full examples: 72_wait_for_message.py, 73_wait_for_message_streaming.py, 74_kafka_consumer_agent.py.
Configuration
conductor.workflow-message-queue.enabled=true
conductor.workflow-message-queue.maxQueueSize=1000
conductor.workflow-message-queue.ttlSeconds=86400
conductor.workflow-message-queue.maxBatchSize=100
| Property | Default | Description |
|---|---|---|
enabled |
false |
Enable the WMQ feature |
maxQueueSize |
1000 |
Max messages queued per workflow |
ttlSeconds |
86400 |
Message TTL (24 h) |
maxBatchSize |
100 |
Max messages returned per PULL_WORKFLOW_MESSAGES poll |