Ruby SDK
Source
GitHub: conductor-oss/ruby-sdk | Report issues and contribute on GitHub.
Features
- Full Feature Parity with Python SDK
- Ruby-Idiomatic Workflow DSL - Clean block-based syntax with 25+ task types
- Worker Framework - Multi-threaded task execution with class-based and block-based workers
- LLM/AI Tasks - Chat completion, embeddings, RAG, image/audio generation
- Orkes Cloud Support - Authentication, secrets, integrations, prompts
- Comprehensive Testing - 400+ unit tests, 110 integration tests
Installation
Add to your Gemfile:
Or install directly:
Quick Start
Hello World
require 'conductor'
# Configuration (reads CONDUCTOR_SERVER_URL from environment)
config = Conductor::Configuration.new
# Create clients
clients = Conductor::Orkes::OrkesClients.new(config)
executor = clients.get_workflow_executor
# Define a worker
class GreetWorker
include Conductor::Worker::WorkerModule
worker_task 'greet'
def execute(task)
name = get_input(task, 'name', 'World')
{ 'result' => "Hello, #{name}!" }
end
end
# Build workflow using new DSL
workflow = Conductor.workflow :greetings, version: 1, executor: executor do
greet = simple :greet, name: wf[:name]
output result: greet[:result]
end
# Register and execute
workflow.register(overwrite: true)
# Start workers
runner = Conductor::Worker::TaskRunner.new(config)
runner.register_worker(GreetWorker.new)
runner.start
# Execute workflow
result = workflow.execute(input: { 'name' => 'Ruby' }, wait_for_seconds: 30)
puts "Result: #{result.output['result']}" # => "Hello, Ruby!"
runner.stop
Workflow DSL
The SDK provides a clean, Ruby-idiomatic DSL for building workflows:
workflow = Conductor.workflow :order_processing, version: 1, executor: executor do
# Access workflow inputs with wf[:param]
user = simple :get_user, user_id: wf[:user_id]
# Reference task outputs with task[:field]
order = simple :validate_order, email: user[:email]
# HTTP calls
http :call_api, url: 'https://api.example.com', method: :post, body: { id: order[:id] }
# Parallel execution
parallel do
simple :ship_order, order_id: order[:id]
simple :send_confirmation, email: user[:email]
end
# Conditional branching
decide order[:region] do
on 'US' do
simple :us_shipping
end
on 'EU' do
simple :eu_shipping
end
otherwise do
terminate :failed, 'Unsupported region'
end
end
# Set workflow output
output tracking: order[:tracking_number], status: 'completed'
end
# Register and execute
workflow.register(overwrite: true)
result = workflow.execute(input: { user_id: 123 }, wait_for_seconds: 60)
Task Methods Reference
Basic Tasks
# Simple task (worker execution)
result = simple :task_name, input1: 'value', input2: wf[:param]
# Inline code execution
jq :transform, query: '.items | map(.name)', input: previous[:data]
javascript :compute, script: 'return inputs.a + inputs.b', a: 1, b: 2
# Set workflow variables
set_variable :save_state, user_id: user[:id], status: 'active'
# Human/manual task
human :approval, display_name: 'Manager Approval', form_template: 'approval_form'
HTTP Tasks
# HTTP request
http :call_api,
url: 'https://api.example.com/users',
method: :post,
headers: { 'Authorization' => 'Bearer ${workflow.secrets.api_token}' },
body: { name: wf[:name], email: wf[:email] }
# HTTP polling (wait for condition)
http_poll :wait_for_ready,
url: 'https://api.example.com/status/${workflow.input.job_id}',
method: :get,
termination_condition: '$.status == "ready"',
polling_interval: 5,
polling_strategy: :fixed
Control Flow
# Parallel execution (fork/join)
parallel do
simple :branch_a
simple :branch_b
simple :branch_c
end
# Conditional branching
decide order[:status] do
on 'pending' do
simple :process_pending
end
on 'approved' do
simple :process_approved
end
otherwise do
simple :handle_unknown
end
end
# Conditional shortcuts
when_true user[:is_premium] do
simple :apply_discount
end
when_false order[:validated] do
terminate :failed, 'Order validation failed'
end
# Loop over items
loop_over users[:list], as: :user do
simple :process_user, user_id: iteration[:user][:id]
end
# Do-while loop
do_while :retry_loop, condition: '${retry_ref.output.success} == false' do
simple :retry_operation
end
Sub-workflows
# Call another workflow
sub_workflow :process_order,
workflow_name: 'order_processor',
version: 2,
input: { order_id: wf[:order_id] }
# Start workflow (fire-and-forget)
start_workflow :trigger_notification,
workflow_name: 'send_notifications',
input: { user_id: user[:id] }
# Inline sub-workflow definition
inline_workflow :nested_process do
simple :step1
simple :step2
end
Wait and Events
# Wait for duration
wait :pause, duration: '30s' # or '5m', '1h', '2d'
# Wait until specific time
wait :scheduled, until: '2024-12-25T00:00:00Z'
# Wait for external webhook
wait_for_webhook :external_callback,
matches: { 'type' => 'payment', 'order_id' => '${workflow.input.order_id}' }
# Publish event
event :notify, sink: 'conductor:workflow_events', payload: { status: 'completed' }
Termination
# Complete workflow
terminate :success, 'Processing completed successfully'
# Fail workflow
terminate :failed, 'Validation error: missing required field'
Dynamic Tasks
# Dynamic task name (resolved at runtime)
dynamic :run_handler, task_to_execute: wf[:handler_name]
# Dynamic fork (parallel tasks determined at runtime)
dynamic_fork :process_all,
tasks_input: wf[:items],
task_name: 'process_item'
LLM/AI Tasks
workflow = Conductor.workflow :ai_assistant, executor: executor do
# Chat completion (messages auto-converted from simple format)
response = llm_chat :chat,
provider: 'openai',
model: 'gpt-4',
messages: [
{ role: :system, message: 'You are a helpful assistant.' },
{ role: :user, message: wf[:question] }
],
temperature: 0.7
# Text completion
llm_text :complete,
provider: 'anthropic',
model: 'claude-3-sonnet',
prompt: 'Summarize: ${workflow.input.text}'
# Generate embeddings
embeddings = llm_embeddings :embed,
provider: 'openai',
model: 'text-embedding-3-small',
text: wf[:document]
# Store embeddings in vector DB
llm_store_embeddings :store,
provider: 'pinecone',
index: 'documents',
embeddings: embeddings[:embeddings],
metadata: { doc_id: wf[:doc_id] }
# Search embeddings
llm_search_embeddings :search,
provider: 'pinecone',
index: 'documents',
query: wf[:search_query],
max_results: 10
# Generate image
generate_image :create_image,
provider: 'openai',
model: 'dall-e-3',
prompt: 'A sunset over mountains',
size: '1024x1024'
# Generate audio (text-to-speech)
generate_audio :speak,
provider: 'openai',
model: 'tts-1',
text: response[:content],
voice: 'nova'
# MCP (Model Context Protocol) integration
tools = list_mcp_tools :get_tools, server_name: 'my_mcp_server'
call_mcp_tool :use_tool,
server_name: 'my_mcp_server',
tool_name: 'search_documents',
arguments: { query: wf[:query] }
output answer: response[:content]
end
Output References
The DSL uses a clean syntax for referencing outputs:
# Workflow input reference
wf[:user_id] # => '${workflow.input.user_id}'
# Task output reference
task[:field] # => '${task_ref.output.field}'
task[:nested][:path] # => '${task_ref.output.nested.path}'
# Loop iteration references (inside loop_over)
iteration[:current_item] # Current item being processed
iteration[:index] # Current index (0-based)
iteration[:user][:name] # If `as: :user` specified
Examples
The examples/ directory contains comprehensive examples:
| Example | Description |
|---|---|
helloworld/ |
Simplest complete example - worker + workflow + execution |
workflow_dsl.rb |
Comprehensive new DSL showcase |
simple_worker.rb |
Worker patterns: class-based, block-based, error handling |
kitchensink.rb |
All major task types using new DSL |
dynamic_workflow.rb |
Create and execute workflows at runtime |
workflow_ops.rb |
Lifecycle operations: pause, resume, restart, retry |
agentic_workflows/ |
LLM chat and AI workflow examples |
Run examples:
# Set environment variables
export CONDUCTOR_SERVER_URL=http://localhost:8080/api
# For Orkes Cloud:
# export CONDUCTOR_AUTH_KEY=your_key
# export CONDUCTOR_AUTH_SECRET=your_secret
# Run hello world
cd examples/helloworld && bundle exec ruby helloworld.rb
# Run DSL showcase
bundle exec ruby examples/workflow_dsl.rb
# Run kitchen sink
bundle exec ruby examples/kitchensink.rb
Worker Framework
Class-Based Workers
class ImageProcessor
include Conductor::Worker::WorkerModule
worker_task 'process_image', poll_interval: 1, thread_count: 4
def execute(task)
url = get_input(task, 'image_url')
# Process image...
result = Conductor::Http::Models::TaskResult.complete
result.add_output_data('processed_url', processed_url)
result.log('Image processed successfully')
result
end
end
Block-Based Workers
worker = Conductor::Worker.define('simple_task') do |task|
input = task.input_data['value']
{ result: input * 2 } # Return hash for automatic TaskResult
end
Running Workers
runner = Conductor::Worker::TaskRunner.new(config)
runner.register_worker(ImageProcessor.new)
runner.register_worker(worker)
runner.start(threads: 4)
# Graceful shutdown
trap('INT') { runner.stop }
sleep while runner.running?
Configuration
Environment Variables
export CONDUCTOR_SERVER_URL=http://localhost:8080/api
export CONDUCTOR_AUTH_KEY=your_key # For Orkes Cloud
export CONDUCTOR_AUTH_SECRET=your_secret # For Orkes Cloud
Programmatic
config = Conductor::Configuration.new(
server_api_url: 'https://play.orkes.io/api',
auth_key: 'your_key',
auth_secret: 'your_secret',
auth_token_ttl_min: 45,
verify_ssl: true
)
API Coverage
Resource APIs (17 classes)
| API | Description |
|---|---|
| WorkflowResourceApi | Workflow execution and management |
| TaskResourceApi | Task polling and updates |
| MetadataResourceApi | Workflow/task definitions |
| SchedulerResourceApi | Scheduled workflows |
| EventResourceApi | Event handlers |
| WorkflowBulkResourceApi | Bulk operations |
| PromptResourceApi | AI prompt templates |
| SecretResourceApi | Secret management |
| IntegrationResourceApi | External integrations |
| + 8 more | Authorization, Users, Groups, Roles, etc. |
High-Level Clients (9 classes)
clients = Conductor::Orkes::OrkesClients.new(config)
workflow_client = clients.get_workflow_client
task_client = clients.get_task_client
metadata_client = clients.get_metadata_client
scheduler_client = clients.get_scheduler_client
prompt_client = clients.get_prompt_client
secret_client = clients.get_secret_client
authorization_client = clients.get_authorization_client
workflow_executor = clients.get_workflow_executor
Testing
# Unit tests
bundle exec rspec spec/conductor/
# Integration tests (requires Conductor server)
CONDUCTOR_SERVER_URL=http://localhost:8080/api bundle exec rspec spec/integration/
Requirements
- Ruby 2.6+ (Ruby 3+ recommended)
- Conductor OSS 3.x or Orkes Cloud
Dependencies
faraday ~> 2.0- HTTP clientfaraday-net_http_persistent ~> 2.0- Connection poolingfaraday-retry ~> 2.0- Automatic retriesconcurrent-ruby ~> 1.2- Thread-safe concurrency
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Run tests (
bundle exec rspec) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
Apache 2.0 - see LICENSE for details.
Links
Examples
Browse all examples on GitHub: conductor-oss/ruby-sdk/examples
| Example | Type |
|---|---|
| Agentic Workflows | directory |
| Dynamic Workflow | file |
| Event Handler | file |
| Event Listener Examples | file |
| Helloworld | directory |
| Kitchensink | file |
| Metadata Journey | file |
| Metrics Example | file |
| New Dsl Demo | file |
| Orkes | directory |
| Prompt Journey | file |
| Rag Workflow | file |
| Schedule Journey | file |
| Simple Worker | file |
| Simple Workflow | file |
| Task Context Example | file |
| Task Listener Example | file |
| Worker Configuration Example | file |
| Workflow Dsl | file |
| Workflow Ops | file |