Skip to content

Workflow Message Queue (WMQ) — Architecture

Overview

The Workflow Message Queue (WMQ) is an opt-in Conductor feature that lets external systems push arbitrary JSON messages into a running workflow at any time. The workflow consumes those messages at defined checkpoints using a new system task type: PULL_WORKFLOW_MESSAGES.

WMQ introduces a per-workflow message buffer backed by Redis. External callers push messages to a REST endpoint. The workflow reads them via the task, which either completes immediately if messages are waiting or stays IN_PROGRESS until they arrive.

This is a distinct capability from existing Conductor mechanisms. See Why not existing mechanisms below.

Primary use cases

Use case Description
Agentic / agent loops An AI agent workflow loops and waits for tool results or human confirmations from external callers. The caller pushes a message when the tool responds; the loop unblocks.
Webhook-driven workflows An asynchronous HTTP callback needs to feed data into a paused workflow. The callback target is the WMQ push endpoint, not a polling mechanism.
Notification pipelines A workflow loops, reads messages in configurable batches, and forks to fan out to multiple channels.
Human-in-the-loop Structured human decisions or approval payloads are injected into a running workflow by an operator tool or UI.

Why not existing mechanisms

Mechanism Why it does not satisfy WMQ requirements
Event handlers Operate at the workflow-definition level, not per workflow instance. They cannot target a specific running execution by ID.
WAIT task Has no structured message payload. Unblocking via the external completion API carries no data into the task output.
HTTP task Requires the workflow to reach out to an endpoint. WMQ inverts that model: the workflow receives data pushed by an external party.

Architecture components

1. Message queue storage

The storage layer is defined as an interface in core and implemented in redis-persistence.

Interface: com.netflix.conductor.dao.WorkflowMessageQueueDAO

Operations: - push(workflowId, message) — append a message to the workflow's queue - pop(workflowId, maxCount) — atomically dequeue up to maxCount messages from the head - size(workflowId) — return the current queue depth - delete(workflowId) — remove the entire queue key

Redis implementation: com.netflix.conductor.redis.dao.RedisWorkflowMessageQueueDAO

Redis data structure: one List per workflow.

Property Detail
Key pattern wmq:{workflowId}
Enqueue RPUSH — appended to tail for FIFO ordering
Dequeue LRANGE to read + LTRIM to remove. Safe without an atomic Lua script because Conductor holds a per-workflow execution lock during the decide cycle. For Redis 6.2+, LPOP key count could simplify this.
TTL Configurable; default 24 hours (86,400 seconds). Reset to full TTL on every RPUSH.
Max size Configurable cap; default 1,000 messages. push returns an error if the queue is at capacity.

Each message stored in the Redis list is a JSON string conforming to the message schema described below.

2. Message schema

Every message is a JSON object with the following fields:

{
  "id": "3f2504e0-4f89-11d3-9a0c-0305e82c3301",
  "workflowId": "8e2c14e1-99ab-4c10-b4a8-a7b0d2f0e123",
  "payload": {
    "decision": "approved",
    "approvedBy": "user@example.com"
  },
  "receivedAt": "2025-06-15T10:30:00Z"
}
Field Type Description
id UUID v4 string Generated by the push endpoint at ingestion time. Returned to the caller.
workflowId string The workflow instance that owns this message. Redundant with the queue key but included for downstream traceability.
payload arbitrary JSON object The data provided by the external caller. Conductor does not interpret or validate the structure.
receivedAt ISO-8601 UTC timestamp Recorded at ingestion time.

3. REST API

Endpoint: POST /api/workflow/{workflowId}/messages

Request body: arbitrary JSON object (the payload field of the message)

Response: the generated message id as a plain string

Validation: - The target workflow must exist. - The workflow must be in RUNNING status. Pushes to workflows in PAUSED, COMPLETED, FAILED, TIMED_OUT, or TERMINATED states are rejected with an appropriate HTTP error.

Feature flag guard: The REST controller bean is only registered when the WMQ feature is enabled (see Feature flag). When disabled, the endpoint does not exist at all — it is absent from Swagger UI and returns 404.

Side effect after push: After storing the message in Redis, the endpoint calls workflowExecutor.decide(workflowId). This triggers an immediate workflow evaluation cycle, allowing an in-progress PULL_WORKFLOW_MESSAGES task to be woken up without waiting for the next SystemTaskWorker poll interval. See Interaction with WorkflowSweeper.

4. PULL_WORKFLOW_MESSAGES system task

PULL_WORKFLOW_MESSAGES is an async system task (isAsync() = true). It integrates with the SystemTaskWorker polling loop.

Task type string: PULL_WORKFLOW_MESSAGES

Input parameters (from task definition inputParameters):

Parameter Type Default Description
batchSize int 1 Maximum number of messages to dequeue in one invocation. Capped by the server-side maxBatchSize configuration.

Lifecycle:

  1. start() — Called once when the task enters SCHEDULED state. Sets the task status to IN_PROGRESS.
  2. execute() — Called by SystemTaskWorker on each poll cycle. Checks the Redis queue.
  3. If the queue is empty: returns false. The task stays IN_PROGRESS. AsyncSystemTaskExecutor re-queues the task message with a short callbackAfterSeconds.
  4. If the queue is non-empty: atomically pops up to batchSize messages, writes the output, sets status to COMPLETED, returns true.

Output fields (on COMPLETED):

Field Type Description
messages array of message objects The dequeued messages, each with id, workflowId, payload, and receivedAt.
count int Actual number of messages returned. Always <= batchSize.

Timeout behavior: The task respects the timeoutSeconds field configured in the workflow task definition or task definition metadata. If the task times out waiting for messages, Conductor applies the standard timeout mechanism and transitions the task to TIMED_OUT. No special handling is required in PULL_WORKFLOW_MESSAGES itself.

5. Configuration properties

The feature is controlled by a @ConfigurationProperties class with prefix conductor.workflow-message-queue.

Property Type Default Description
conductor.workflow-message-queue.enabled boolean false Master switch. All WMQ beans are gated on this being true.
conductor.workflow-message-queue.maxQueueSize int 1000 Maximum number of messages allowed in a single workflow's queue at one time.
conductor.workflow-message-queue.ttlSeconds long 86400 TTL (seconds) applied to the Redis key. Reset on every push.
conductor.workflow-message-queue.maxBatchSize int 100 Server-side upper bound on batchSize for any single PULL_WORKFLOW_MESSAGES execution.

6. Feature flag

conductor.workflow-message-queue.enabled=false

When false (the default): - The REST controller bean is not created. - The PULL_WORKFLOW_MESSAGES system task bean is not created. The task type is unknown to SystemTaskRegistry. Any workflow definition referencing it will fail validation. - The RedisWorkflowMessageQueueDAO bean is not created. - The WorkflowMessageQueueCleanupListener bean is not created.

The feature has zero runtime footprint when disabled. Existing deployments are unaffected.

7. Lifecycle cleanup

Queue cleanup relies on the Redis TTL configured via conductor.workflow-message-queue.ttlSeconds (default 24 hours). The TTL is reset on every push, so active queues are never prematurely expired.

There is no explicit WorkflowStatusListener implementation for cleanup. WorkflowStatusListener is a single-bean interface; adding a WMQ implementation would conflict with other listener implementations (e.g., the archiving listener). The Redis TTL is sufficient: any orphaned queue (e.g., after a server crash before a workflow completes) will expire automatically.

Data flows

Push flow

External caller
  └─ POST /api/workflow/{wfId}/messages  (JSON payload)
       └─ WorkflowMessageQueueResource
            ├─ Validate workflow exists and is RUNNING
            ├─ Generate message ID (UUID v4)
            ├─ Serialize message to JSON
            ├─ dao.push(workflowId, message)
            │    └─ RPUSH wmq:{workflowId} <json>
            │    └─ EXPIRE wmq:{workflowId} <ttlSeconds>
            ├─ workflowExecutor.decide(workflowId)   ← triggers immediate re-evaluation
            └─ Return message ID to caller

Pull flow — message already waiting (happy path)

WorkflowExecutor schedules PULL_WORKFLOW_MESSAGES task
  └─ task status: SCHEDULED
       └─ SystemTaskWorker.execute()
            └─ PullWorkflowMessages.start()
                 └─ task status: IN_PROGRESS
                      └─ AsyncSystemTaskExecutor calls PullWorkflowMessages.execute()
                           └─ dao.pop(workflowId, batchSize) → [message1, ...]
                                └─ task output: { messages: [...], count: N }
                                └─ task status: COMPLETED
                                     └─ WorkflowExecutor.decide() advances workflow

Pull flow — waiting for messages (no messages yet)

PullWorkflowMessages.execute()
  └─ dao.pop(workflowId, batchSize) → []   (queue empty)
       └─ return false
            └─ AsyncSystemTaskExecutor re-queues task with callbackAfterSeconds
                 └─ SystemTaskWorker polls again at next interval
                      └─ [repeat until message arrives]

[Meanwhile, external caller pushes a message via POST /api/workflow/{wfId}/messages]
  └─ workflowExecutor.decide(workflowId) called after push
       └─ AsyncSystemTaskExecutor.execute() triggered
            └─ PullWorkflowMessages.execute()
                 └─ dao.pop() → [message]
                      └─ task COMPLETED, workflow advances

Cleanup

Queue keys expire automatically via Redis TTL (default 24 hours, reset on every push). No explicit cleanup hook is triggered on workflow completion.

Interaction with WorkflowSweeper

PULL_WORKFLOW_MESSAGES is an async system task (isAsync() = true). This means:

  1. When the workflow engine schedules the task, it is placed in the system task queue (a QueueDAO-backed queue keyed on the task type).
  2. SystemTaskWorkerCoordinator registers the task with SystemTaskWorker at startup, which begins polling its queue.
  3. AsyncSystemTaskExecutor.execute() is called for each poll. It calls start() on first execution and execute() on subsequent calls.
  4. When execute() returns false (queue empty), AsyncSystemTaskExecutor re-pushes the task ID into the system task queue with a delay of systemTaskCallbackTime seconds (configured via conductor.app.systemTaskWorkerCallbackDuration).
  5. When a message is pushed via the REST API, workflowExecutor.decide(workflowId) is called immediately. The sweeper re-evaluates the workflow and triggers AsyncSystemTaskExecutor for any IN_PROGRESS async tasks. This reduces wake-up latency to near-real-time rather than waiting for the full poll interval.

PULL_WORKFLOW_MESSAGES overrides getEvaluationOffset() to return Optional.of(1L), so the task is re-evaluated every 1 second while waiting for messages instead of the default systemTaskWorkerCallbackDuration (30 seconds).

Failure modes and resilience

Scenario Behavior
Redis is unavailable during push dao.push() throws an exception. The REST endpoint returns HTTP 500. No message is stored. The caller must retry. There is no message loss because nothing was persisted.
Redis is unavailable during pull dao.pop() throws an exception. PullWorkflowMessages.execute() propagates the error. AsyncSystemTaskExecutor handles task-level failures per the standard retry/timeout configuration. The task may be retried or timed out.
Workflow terminates while PULL_WORKFLOW_MESSAGES is IN_PROGRESS The WorkflowSweeper detects the terminal state and cancels pending tasks. The WorkflowMessageQueueCleanupListener deletes the queue key.
Workflow is paused while PULL_WORKFLOW_MESSAGES is IN_PROGRESS The task stays IN_PROGRESS. Messages pushed during the pause queue up in Redis (subject to maxQueueSize). When the workflow is resumed, workflowExecutor.decide() is called, the sweeper re-evaluates, and the task picks up queued messages on the next poll.
Queue size exceeded dao.push() returns an error when the queue has reached maxQueueSize. The REST endpoint returns HTTP 429 (Too Many Requests) or HTTP 400. The caller must handle backpressure.
Very large message payloads Payloads are stored inline in the Redis list entry. For very large data, use an external storage reference pattern: store the data in an object store (S3, GCS, etc.) and put only the reference URL or key in the WMQ payload. This is the same pattern used for Conductor's external payload storage.
Duplicate delivery The Lua dequeue script is atomic within a single Redis instance, so the same message is not delivered twice within one execute() call. However, at-least-once semantics apply at the system level — callers and workflow designers should treat consumed messages as idempotent where possible.
Network partition between Conductor nodes If multiple Conductor instances are running, the Redis List is shared. The atomic Lua dequeue ensures that two concurrent execute() calls for the same workflow do not return overlapping messages. Workflow-level locking (ExecutionLockService) provides an additional guard on the decide path.

Security and access control considerations

  • The push endpoint receives arbitrary JSON. Conductor does not validate payload structure. Implementors should add authentication/authorization at the API gateway layer.
  • The workflowId parameter in the push URL is sufficient to target any running workflow. Callers must be trusted or the endpoint must be protected.
  • Payload data is stored in Redis with the configured TTL. Sensitive data in payloads is subject to Redis access controls. Consider encrypting sensitive fields at the application level if required.

Summary of components

Component Location Purpose
WorkflowMessageQueueDAO core Interface defining the storage contract
WorkflowMessage common POJO representing a single message
WorkflowMessageQueueProperties core @ConfigurationProperties for all WMQ settings
PullWorkflowMessages core (system tasks) System task that dequeues messages into workflow output
RedisWorkflowMessageQueueDAO redis-persistence Redis List implementation of the DAO
WorkflowMessageQueueConfiguration core Spring @Configuration that wires up the InMemory DAO (default fallback)
RedisWorkflowMessageQueueConfiguration redis-persistence Spring @Configuration that wires up the Redis DAO when Redis is active
WorkflowMessageQueueResource rest REST controller for the push endpoint