JDBC Task
The JDBC task (JDBC) executes SQL statements against relational databases. It supports SELECT queries, UPDATE/INSERT/DELETE statements, parameterized queries, and transaction management with automatic rollback on failure.
Multiple named database connections can be configured, allowing workflows to interact with different databases (MySQL, PostgreSQL, Oracle, etc.) within the same workflow.
Task parameters
| Parameter | Type | Description | Required / Optional |
|---|---|---|---|
| connectionId | String | The name of the configured JDBC instance to use. Must match a name from conductor.jdbc.instances configuration. |
Required (unless integrationName is used). |
| integrationName | String | The name of a managed integration (multi-tenant). Used instead of connectionId for platform-managed connections. |
Optional. |
| type | String | The SQL operation type. Supported: SELECT, UPDATE. |
Required. |
| statement | String | The SQL statement to execute. Use ? for parameterized queries. |
Required. |
| parameters | List[String] | Ordered list of parameter values for ? placeholders in the statement. |
Optional. |
| expectedUpdateCount | Integer | For UPDATE type only. If specified, the transaction is rolled back when the actual update count doesn't match. |
Optional. |
| schemaName | String | Database schema name (reserved for future use). | Optional. |
Configuration JSON
SELECT query
{
"name": "query_users",
"taskReferenceName": "query_users_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "mysql-prod",
"type": "SELECT",
"statement": "SELECT id, name, email FROM users WHERE status = ?",
"parameters": ["active"]
}
}
UPDATE with expected count
{
"name": "update_order_status",
"taskReferenceName": "update_order_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "mysql-prod",
"type": "UPDATE",
"statement": "UPDATE orders SET status = ? WHERE order_id = ?",
"parameters": [
"shipped",
"${workflow.input.orderId}"
],
"expectedUpdateCount": 1
}
}
Output
SELECT output
| Name | Type | Description |
|---|---|---|
| result | List[Map[String, Any]] | List of rows, where each row is a map of column names to values. |
Example output:
{
"result": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"}
]
}
UPDATE output
| Name | Type | Description |
|---|---|---|
| update_count | Integer | The number of rows affected by the statement. |
Example output:
Transaction behavior
- SELECT statements run with auto-commit enabled (default JDBC behavior).
- UPDATE statements run with auto-commit disabled. The transaction is committed on success.
- If
expectedUpdateCountis set and the actual count doesn't match, the transaction is automatically rolled back and the task fails. - If a SQL exception occurs during an UPDATE, the transaction is automatically rolled back.
Connection configuration
JDBC connections are configured using named instances under conductor.jdbc.instances.
Quick setup
conductor:
jdbc:
instances:
- name: "mysql-prod"
connection:
datasourceURL: "jdbc:mysql://prod-db:3306/myapp"
jdbcDriver: "com.mysql.cj.jdbc.Driver"
user: "conductor"
password: "secret"
maximumPoolSize: 20
- name: "postgres-analytics"
connection:
datasourceURL: "jdbc:postgresql://analytics-db:5432/warehouse"
user: "analyst"
password: "secret"
Connection pool options
| Property | Type | Default | Description |
|---|---|---|---|
datasourceURL |
String | Required | JDBC connection URL |
jdbcDriver |
String | Auto-detected | JDBC driver class name |
user |
String | Optional | Database username |
password |
String | Optional | Database password |
maximumPoolSize |
Integer | 32 | Maximum connections in the pool |
minimumIdle |
Integer | 2 | Minimum idle connections |
idleTimeoutMs |
Long | 30000 | Idle connection timeout (ms) |
connectionTimeout |
Long | 30000 | Connection acquisition timeout (ms) |
leakDetectionThreshold |
Long | 60000 | Leak detection threshold (ms) |
maxLifetime |
Long | 1800000 | Maximum connection lifetime (ms) |
Execution
The JDBC task completes as follows:
- COMPLETED: The SQL statement executed successfully. For SELECT, results are in
output.result. For UPDATE, the count is inoutput.update_count. - FAILED: The task fails if:
- The
connectionIddoesn't match any configured instance. - A SQL exception occurs (syntax error, constraint violation, connection timeout).
- The
expectedUpdateCountdoesn't match the actual update count (UPDATE only, triggers rollback).
- The
Examples
Parameterized SELECT
{
"name": "find_active_orders",
"taskReferenceName": "find_orders_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "postgres-analytics",
"type": "SELECT",
"statement": "SELECT order_id, total, created_at FROM orders WHERE customer_id = ? AND status = ? ORDER BY created_at DESC",
"parameters": [
"${workflow.input.customerId}",
"active"
]
}
}
INSERT with expected count
{
"name": "create_audit_record",
"taskReferenceName": "audit_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "mysql-prod",
"type": "UPDATE",
"statement": "INSERT INTO audit_log (action, user_id, details, created_at) VALUES (?, ?, ?, NOW())",
"parameters": [
"${workflow.input.action}",
"${workflow.input.userId}",
"${workflow.input.details}"
],
"expectedUpdateCount": 1
}
}
Chaining SELECT and UPDATE
Use the output of a SELECT task as input to an UPDATE task:
[
{
"name": "get_order",
"taskReferenceName": "get_order_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "mysql-prod",
"type": "SELECT",
"statement": "SELECT id, total FROM orders WHERE order_id = ?",
"parameters": ["${workflow.input.orderId}"]
}
},
{
"name": "apply_discount",
"taskReferenceName": "apply_discount_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "mysql-prod",
"type": "UPDATE",
"statement": "UPDATE orders SET total = total * 0.9 WHERE order_id = ? AND total > 0",
"parameters": ["${workflow.input.orderId}"],
"expectedUpdateCount": 1
}
}
]
Using with different databases in the same workflow
[
{
"name": "read_from_mysql",
"taskReferenceName": "mysql_read_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "mysql-prod",
"type": "SELECT",
"statement": "SELECT user_id, email FROM users WHERE user_id = ?",
"parameters": ["${workflow.input.userId}"]
}
},
{
"name": "write_to_postgres",
"taskReferenceName": "pg_write_ref",
"type": "JDBC",
"inputParameters": {
"connectionId": "postgres-analytics",
"type": "UPDATE",
"statement": "INSERT INTO user_activity (user_id, email, event_type, event_time) VALUES (?, ?, ?, NOW())",
"parameters": [
"${workflow.input.userId}",
"${mysql_read_ref.output.result[0].email}",
"workflow_triggered"
],
"expectedUpdateCount": 1
}
}
]
SQL injection
Always use parameterized queries (? placeholders with the parameters list). Never concatenate user input directly into SQL statements.