Design Doc: File Storage for Conductor OSS
Server + Java SDK design in one document. Based on spec.md.
Table of Contents
- API Contract & Data Model
- REST API Endpoints
- POST /api/files — Create File
- GET /api/files/{fileId}/upload-url
- POST /api/files/{fileId}/upload-complete
- GET /api/files/{workflowId}/{fileId}/download-url
- GET /api/files/{fileId} — Get File Metadata
- POST /api/files/{fileId}/multipart
- GET .../part/{partNumber} — Get Part Upload URL (S3 only)
- POST .../multipart/{uploadId}/complete
- DTOs
- Enums
- Server Interfaces
- FileModel
- Database Schema
- Configuration Properties
- SDK Interfaces
- SDK Internal Classes
- REST API Endpoints
- Design Diagrams
- What-If Scenario Answers
- 1. Crash during file upload
- 2. Temp file missing from disk
- 3. File never fully uploaded
- 4. File handler reuse on same worker
- 5. File access across workflow boundaries
- 6. Presigned URL expires during multipart upload
- 7. Storage backend mismatch
- 8. Concurrent uploads to same file ID
- 9. Worker restart mid-download
- 10. File size exceeds limit after partial upload
- Server Implementation
- Java SDK Implementation
- Testing & Verification
- Developer Experience
API Contract & Data Model
REST API Endpoints
Base path: /api/files
ID nomenclature: JSON payloads use
fileHandleId— the full reference stringconductor://file/<fileId>. URL path parameters use{fileId}— the bare identifier (no prefix). Conversion between the two is handled byFileIdToFileHandleIdConverter.
POST /api/files — Create File
Creates file metadata record and returns presigned upload URL.
Request:
POST /api/files
Content-Type: application/json
{
"fileName": "report.pdf",
"contentType": "application/pdf",
"fileSize": 1048576,
"workflowId": "wf-uuid",
"taskId": "task-uuid"
}
| Field | Type | Required | Description |
|---|---|---|---|
fileName |
String | Yes | Original file name |
contentType |
String | Yes | MIME type |
fileSize |
long | Yes | File size in bytes |
workflowId |
String | Yes | Workflow ID — required, must not be blank |
taskId |
String | No | Task ID (when file is task output) |
Response: 201 Created
{
"fileHandleId": "conductor://file/d6a4e5f7-8b9c-4a1d-b2e3-f4a5b6c7d8e9",
"fileName": "report.pdf",
"contentType": "application/pdf",
"fileSize": 1048576,
"storageType": "S3",
"uploadStatus": "UPLOADING",
"uploadUrl": "https://s3.amazonaws.com/bucket/files/d6a4e5f7-.../report.pdf?X-Amz-...",
"uploadUrlExpiresAt": 1760522460000,
"createdAt": 1760522400000
}
Errors:
| Status | Code | Condition |
|---|---|---|
| 400 | INVALID_REQUEST |
Missing required fields, invalid content type |
| 413 | FILE_TOO_LARGE |
fileSize exceeds conductor.file-storage.max-file-size |
Server logic:
1. Validate request — file size against configured max
2. Generate fileId (random identifier) → conductor://file/<fileId>
3. Generate storage path: files/<fileId>/<fileName>
4. Persist FileModel with status UPLOADING via FileMetadataDAO
5. Generate presigned upload URL via FileStorage.generateUploadUrl()
6. Return FileUploadResponse
GET /api/files/{fileId}/upload-url — Get Upload URL
Returns a presigned upload URL. Used on retry when original URL from create has expired.
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"uploadUrl": "https://s3.../presigned-put-url",
"expiresAt": 1760522460000
}
Errors:
| Status | Code | Condition |
|---|---|---|
| 404 | FILE_NOT_FOUND |
File ID does not exist |
Server logic:
1. Look up FileModel by fileId
2. Generate fresh URL via FileStorage.generateUploadUrl(), return
POST /api/files/{fileId}/upload-complete — Confirm Upload
Confirms that the file has been uploaded to the storage backend. Server verifies the file exists on storage, reads content hash and actual size from storage provider, persists both.
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"uploadStatus": "UPLOADED",
"contentHash": "d41d8cd98f00b204e9800998ecf8427e"
}
Errors:
| Status | Code | Condition |
|---|---|---|
| 404 | FILE_NOT_FOUND |
File ID does not exist |
| 409 | ALREADY_UPLOADED |
File already in UPLOADED status |
| 500 | VERIFICATION_FAILED |
File not found on storage backend |
Server logic:
1. Look up FileModel by fileId
2. Single call: FileStorage.getStorageFileInfo(storagePath) — verifies file exists, reads hash + size
3. If not exists → 500 VERIFICATION_FAILED
4. Persist via FileMetadataDAO.updateUploadComplete(fileId, UPLOADED, contentHash, contentSize)
5. Return FileUploadCompleteResponse (includes contentHash)
GET /api/files/{workflowId}/{fileId}/download-url — Get Download URL
Returns a presigned download URL.
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"downloadUrl": "https://s3.../presigned-get-url",
"expiresAt": 1760522460000
}
Errors:
| Status | Code | Condition |
|---|---|---|
| 404 | FILE_NOT_FOUND |
File ID does not exist |
| 400 | UPLOAD_NOT_COMPLETE |
File status is not UPLOADED |
| 403 | ACCESS_FORBIDDEN |
Caller's workflowId is not in the file's workflow family |
Server logic:
1. Look up FileModel — verify status is UPLOADED
2. If file has no workflowId → 403 ACCESS_FORBIDDEN
3. If caller's workflowId equals conductor.file-storage.default-workflow-id → skip family check
4. Otherwise: resolve workflow family via WorkflowFamilyResolver.getFamily(callerWorkflowId); if file's workflowId not in family → 403 ACCESS_FORBIDDEN
5. Generate fresh URL via FileStorage.generateDownloadUrl(), return
GET /api/files/{fileId} — Get File Metadata
Returns full file metadata.
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"fileName": "report.pdf",
"contentType": "application/pdf",
"fileSize": 1048576,
"contentHash": "d41d8cd98f00b204e9800998ecf8427e",
"storageType": "S3",
"uploadStatus": "UPLOADED",
"workflowId": "wf-uuid",
"taskId": "task-uuid",
"createdAt": 1760522400000,
"updatedAt": 1760522430000
}
storagePath is server-internal — never exposed.
Errors:
| Status | Code | Condition |
|---|---|---|
| 404 | FILE_NOT_FOUND |
File ID does not exist |
POST /api/files/{fileId}/multipart — Initiate Multipart Upload
Initiates a multipart upload on the storage backend.
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"uploadId": "abc123-multipart-upload-id",
"uploadUrl": null,
"partSize": 5242880
}
| Field | Description |
|---|---|
uploadId |
Backend-specific multipart upload ID |
uploadUrl |
Resumable URL for GCS/Azure. Null for S3 (uses per-part URLs). |
partSize |
Recommended part size in bytes |
Server logic:
1. Look up FileModel — verify status is UPLOADING
2. Call FileStorage.initiateMultipartUpload() → returns upload ID
3. For GCS/Azure: generate resumable URL
4. Calculate part size from file size and backend constraints
5. Return MultipartInitResponse
GET /api/files/{fileId}/multipart/{uploadId}/part/{partNumber} — Get Part Upload URL (S3 only)
Returns a presigned URL for uploading a single part. Only used by S3 backend — GCS/Azure reuse the resumable URL from initiate.
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"uploadUrl": "https://s3.../presigned-part-url",
"expiresAt": 1760522460000
}
Server logic:
1. Call FileStorage.generatePartUploadUrl(storagePath, uploadId, partNumber, expiration)
2. Return FileUploadUrlResponse
POST /api/files/{fileId}/multipart/{uploadId}/complete — Complete Multipart Upload
Finalizes a multipart upload after all parts have been uploaded.
Request:
Response: 200 OK
{
"fileHandleId": "conductor://file/d6a4e5f7-...",
"uploadStatus": "UPLOADED",
"contentHash": "d41d8cd98f00b204e9800998ecf8427e"
}
Server logic:
1. Call FileStorage.completeMultipartUpload(storagePath, uploadId, partETags)
2. Call FileStorage.getStorageFileInfo(storagePath) — verify file exists, read content hash + actual size
3. If not exists → 500 VERIFICATION_FAILED
4. Persist storageContentHash + storageContentSize via FileMetadataDAO.updateUploadComplete()
5. Return FileUploadCompleteResponse (includes contentHash)
DTOs
Server side: org.conductoross.conductor.model.file (in file-storage-conductor/common).
SDK side (mirrored): org.conductoross.conductor.client.model.file (in file-storage-java-sdk/conductor-client).
All DTOs are plain Java classes with explicit getters/setters (no Lombok), explicit toString(), equals(), hashCode() via Objects.hash()/Objects.equals(). No proto annotations. Defined independently on the server side (file-storage-conductor/common) and mirrored on the SDK side (file-storage-java-sdk/conductor-client) under org.conductoross.conductor.client.model.file.
FileUploadRequest
package org.conductoross.conductor.model.file;
public class FileUploadRequest {
private String fileName;
private String contentType;
private long fileSize;
private String workflowId;
private String taskId;
public String getFileName() { return fileName; }
public void setFileName(String fileName) { this.fileName = fileName; }
public String getContentType() { return contentType; }
public void setContentType(String contentType) { this.contentType = contentType; }
public long getFileSize() { return fileSize; }
public void setFileSize(long fileSize) { this.fileSize = fileSize; }
public String getWorkflowId() { return workflowId; }
public void setWorkflowId(String workflowId) { this.workflowId = workflowId; }
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }
@Override
public String toString() {
return "FileUploadRequest{fileName='" + fileName + "', contentType='" + contentType
+ "', fileSize=" + fileSize + "}";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FileUploadRequest that)) return false;
return fileSize == that.fileSize
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(contentType, that.contentType);
}
@Override
public int hashCode() {
return Objects.hash(fileName, contentType, fileSize);
}
}
FileUploadResponse
package org.conductoross.conductor.model.file;
public class FileUploadResponse {
private String fileHandleId;
private String fileName;
private String contentType;
private long fileSize;
private StorageType storageType;
private FileUploadStatus uploadStatus;
private String uploadUrl;
private long uploadUrlExpiresAt;
private long createdAt;
// getters and setters for all fields
@Override
public String toString() {
return "FileUploadResponse{fileHandleId='" + fileHandleId + "', uploadStatus=" + uploadStatus + "}";
}
}
FileHandle
Server-to-client DTO for file metadata. Does not expose storagePath.
package org.conductoross.conductor.model.file;
public class FileHandle {
private String fileHandleId;
private String fileName;
private String contentType;
private long fileSize;
private String contentHash; // from storage provider — null until upload confirmed
private StorageType storageType;
private FileUploadStatus uploadStatus;
private String workflowId;
private String taskId;
private long createdAt;
private long updatedAt;
// getters and setters for all fields
@Override
public String toString() {
return "FileHandle{fileHandleId='" + fileHandleId + "', fileName='" + fileName
+ "', uploadStatus=" + uploadStatus + "}";
}
}
FileUploadUrlResponse
package org.conductoross.conductor.model.file;
public class FileUploadUrlResponse {
private String fileHandleId;
private String uploadUrl;
private long expiresAt;
// getters and setters
@Override
public String toString() {
return "FileUploadUrlResponse{fileHandleId='" + fileHandleId + "', expiresAt=" + expiresAt + "}";
}
}
FileDownloadUrlResponse
package org.conductoross.conductor.model.file;
public class FileDownloadUrlResponse {
private String fileHandleId;
private String downloadUrl;
private long expiresAt;
// getters and setters
}
FileUploadCompleteResponse
package org.conductoross.conductor.model.file;
public class FileUploadCompleteResponse {
private String fileHandleId;
private FileUploadStatus uploadStatus;
private String contentHash; // from storage provider — null for local backend
// getters and setters
}
MultipartInitResponse
package org.conductoross.conductor.model.file;
public class MultipartInitResponse {
private String fileHandleId;
private String uploadId;
private String uploadUrl; // resumable URL for GCS/Azure; null for S3
private long partSize;
// getters and setters
}
MultipartCompleteRequest
package org.conductoross.conductor.model.file;
import java.util.List;
public class MultipartCompleteRequest {
private List<String> partETags;
// getter and setter
}
FileIdToFileHandleIdConverter
Bidirectional helper between the bare identifier (fileId) used in URL path params and the prefixed handle (fileHandleId) used in JSON payloads.
package org.conductoross.conductor.model.file;
public final class FileIdToFileHandleIdConverter {
public static final String PREFIX = "conductor://file/";
private FileIdToFileHandleIdConverter() {}
public static String toFileHandleId(String fileId) {
return fileId.startsWith(PREFIX) ? fileId : PREFIX + fileId;
}
public static String toFileId(String value) {
return value.startsWith(PREFIX) ? value.substring(PREFIX.length()) : value;
}
public static boolean isFileHandleId(Object value) {
return value instanceof String s && s.startsWith(PREFIX);
}
}
Mirrored in the SDK at org.conductoross.conductor.client.model.file.FileIdToFileHandleIdConverter.
Enums
StorageType
Defined on both sides independently (no shared common module). Server: org.conductoross.conductor.model.file. SDK: org.conductoross.conductor.client.model.file.
package org.conductoross.conductor.model.file;
public enum StorageType {
S3,
AZURE_BLOB,
GCS,
LOCAL
}
FileUploadStatus
Defined on both sides independently. Server: org.conductoross.conductor.model.file. SDK: org.conductoross.conductor.client.model.file.
package org.conductoross.conductor.model.file;
public enum FileUploadStatus {
PENDING,
UPLOADING,
UPLOADED,
FAILED
}
PENDING is defined for future use (e.g. pre-initialized records); the runtime flow enters UPLOADING on file creation.
FileDownloadStatus
SDK-side only. In conductor-client module.
package org.conductoross.conductor.sdk.file;
public enum FileDownloadStatus {
NOT_STARTED,
DOWNLOADING,
DOWNLOADED,
FAILED
}
Server Interfaces
FileStorage
core module — org.conductoross.conductor.core.storage
package org.conductoross.conductor.core.storage;
import java.time.Duration;
import java.util.List;
import org.conductoross.conductor.model.file.StorageType;
/**
* Abstracts storage backend operations for file storage.
* One implementation per backend (S3, Azure Blob, GCS, Local).
*/
public interface FileStorage {
/** Returns the storage type this implementation handles. */
StorageType getStorageType();
/**
* Generate a presigned upload URL (or equivalent) for the given storage path.
*
* @param storagePath internal storage key
* @param expiration URL TTL
* @return presigned URL string, or direct API path for local backend
*/
String generateUploadUrl(String storagePath, Duration expiration);
/**
* Generate a presigned download URL (or equivalent) for the given storage path.
*
* @param storagePath internal storage key
* @param expiration URL TTL
* @return presigned URL string, or direct API path for local backend
*/
String generateDownloadUrl(String storagePath, Duration expiration);
/**
* Verify that the file exists at the given path and the upload is complete.
*
* @param storagePath internal storage key
* @return true if file exists and is complete
*/
/**
* Verify upload and return storage metadata in a single call to the storage backend.
* Returns null if file does not exist at the given path.
*/
StorageFileInfo getStorageFileInfo(String storagePath);
/**
* Initiate a multipart upload. Returns a backend-specific upload ID.
*
* @param storagePath internal storage key
* @return upload ID
*/
String initiateMultipartUpload(String storagePath);
/**
* Generate a presigned URL for uploading a single part (S3).
* GCS/Azure use resumable URLs and do not call this method.
*
* @param storagePath internal storage key
* @param uploadId multipart upload ID from {@link #initiateMultipartUpload}
* @param partNumber 1-based part number
* @param expiration URL TTL
* @return presigned URL for this part
*/
String generatePartUploadUrl(String storagePath, String uploadId, int partNumber,
Duration expiration);
/**
* Complete a multipart upload after all parts have been uploaded.
*
* @param storagePath internal storage key
* @param uploadId multipart upload ID
* @param partETags ordered list of ETags returned by each part upload
*/
void completeMultipartUpload(String storagePath, String uploadId, List<String> partETags);
}
StorageFileInfo
Server-internal value object. core module — org.conductoross.conductor.core.storage
package org.conductoross.conductor.core.storage;
public class StorageFileInfo {
private boolean exists;
private String contentHash; // raw from storage provider — null for local backend
private long contentSize; // actual bytes on storage
// getters and setters
}
FileStorageService
core module — org.conductoross.conductor.core.storage
package org.conductoross.conductor.core.storage;
import java.util.List;
import org.springframework.validation.annotation.Validated;
import org.conductoross.conductor.model.file.*;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@Validated
public interface FileStorageService {
FileUploadResponse createFile(
@NotNull(message = "FileUploadRequest cannot be null") @Valid
FileUploadRequest request);
FileUploadUrlResponse getUploadUrl(
@NotEmpty(message = "fileId cannot be empty") String fileId);
FileUploadCompleteResponse confirmUpload(
@NotEmpty(message = "fileId cannot be empty") String fileId);
FileDownloadUrlResponse getDownloadUrl(
@NotEmpty(message = "fileId cannot be empty") String fileId,
@NotNull String workflowId);
FileHandle getFileMetadata(
@NotEmpty(message = "fileId cannot be empty") String fileId);
MultipartInitResponse initiateMultipartUpload(
@NotEmpty(message = "fileId cannot be empty") String fileId);
FileUploadUrlResponse getPartUploadUrl(
@NotEmpty(message = "fileId cannot be empty") String fileId,
@NotEmpty(message = "uploadId cannot be empty") String uploadId,
int partNumber);
FileUploadCompleteResponse completeMultipartUpload(
@NotEmpty(message = "fileId cannot be empty") String fileId,
@NotEmpty(message = "uploadId cannot be empty") String uploadId,
@NotNull List<String> partETags);
}
FileMetadataDAO
core module — org.conductoross.conductor.dao
package org.conductoross.conductor.dao;
import java.util.List;
import org.conductoross.conductor.model.file.FileUploadStatus;
import org.conductoross.conductor.model.FileModel;
/** Data access layer for file metadata persistence. */
public interface FileMetadataDAO {
/**
* Create a new file metadata record.
*
* @param fileModel file metadata to persist
*/
void createFileMetadata(FileModel fileModel);
/**
* Get file metadata by file ID.
*
* @param fileId bare file identifier (no prefix)
* @return file metadata, or null if not found
*/
FileModel getFileMetadata(String fileId);
/**
* Update the upload status of a file.
*
* @param fileId file ID
* @param status new upload status
*/
void updateUploadStatus(String fileId, FileUploadStatus status);
/**
* Update file metadata on upload completion — sets status, content hash, and actual size
* from the storage provider in a single DAO call.
*
* @param fileId file ID
* @param status new upload status (UPLOADED)
* @param contentHash hash from storage provider (null for local backend)
* @param contentSize actual bytes on storage
*/
void updateUploadComplete(String fileId, FileUploadStatus status, String contentHash,
long contentSize);
/**
* Get all files associated with a workflow.
*
* @param workflowId workflow instance ID
* @return list of file metadata
*/
List<FileModel> getFilesByWorkflowId(String workflowId);
/**
* Get all files associated with a task.
*
* @param taskId task instance ID
* @return list of file metadata
*/
List<FileModel> getFilesByTaskId(String taskId);
}
WorkflowFamilyResolver
core module — org.conductoross.conductor.core.storage
package org.conductoross.conductor.core.storage;
import java.util.Set;
/**
* Resolves the full family tree of a workflow: self, all ancestors (parentWorkflowId chain), and
* all descendants (recursive children), with no depth limit.
*/
public interface WorkflowFamilyResolver {
/**
* Returns all workflow IDs in the family of the given workflowId. Returns an empty set if the
* workflowId is null or not found.
*/
Set<String> getFamily(String workflowId);
}
WorkflowFamilyResolverImpl
core module — org.conductoross.conductor.core.storage
@Component
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
public class WorkflowFamilyResolverImpl implements WorkflowFamilyResolver {
private final ExecutionDAO executionDAO;
public WorkflowFamilyResolverImpl(ExecutionDAO executionDAO) {
this.executionDAO = executionDAO;
}
@Override
public Set<String> getFamily(String workflowId) {
Set<String> family = new HashSet<>();
collectAncestors(workflowId, family);
if (executionDAO.canSearchAcrossWorkflows()) {
collectDescendants(workflowId, family);
}
return family;
}
private void collectAncestors(String workflowId, Set<String> visited) {
if (workflowId == null || visited.contains(workflowId)) return;
WorkflowModel workflow = executionDAO.getWorkflow(workflowId, false);
if (workflow == null) return;
visited.add(workflowId);
if (workflow.getParentWorkflowId() != null) {
collectAncestors(workflow.getParentWorkflowId(), visited);
}
}
private void collectDescendants(String workflowId, Set<String> visited) {
if (workflowId == null) return;
List<WorkflowModel> children = executionDAO.getWorkflowsByParentId(workflowId);
for (WorkflowModel child : children) {
if (!visited.contains(child.getWorkflowId())) {
visited.add(child.getWorkflowId());
collectDescendants(child.getWorkflowId(), visited);
}
}
}
}
collectDescendants is only called when executionDAO.canSearchAcrossWorkflows() returns true (Redis/SQL backends). Cassandra does not support this query; on that backend only ancestors are resolved.
FileModel
core module — org.conductoross.conductor.model
package org.conductoross.conductor.model;
import java.time.Instant;
import org.conductoross.conductor.model.file.FileUploadStatus;
import org.conductoross.conductor.model.file.StorageType;
public class FileModel {
private String fileId;
private String fileName;
private String contentType;
private long fileSize; // declared size from FileUploadRequest
private String storageContentHash; // hash from storage provider — set on upload-complete, null for local
private long storageContentSize; // actual size from storage provider — set on upload-complete
private StorageType storageType;
private String storagePath; // server-internal — never exposed in API
private FileUploadStatus uploadStatus;
private String workflowId;
private String taskId;
private Instant createdAt;
private Instant updatedAt;
// getters and setters for all fields
@Override
public String toString() {
return "FileModel{fileId='" + fileId + "', fileName='" + fileName
+ "', uploadStatus=" + uploadStatus + "}";
}
}
Database Schema
Postgres — V15__file_metadata.sql
CREATE TABLE file_metadata (
file_id VARCHAR(255) NOT NULL PRIMARY KEY,
file_name VARCHAR(1024) NOT NULL,
content_type VARCHAR(255) NOT NULL,
file_size BIGINT NOT NULL,
storage_content_hash VARCHAR(255),
storage_content_size BIGINT,
storage_type VARCHAR(50) NOT NULL,
storage_path VARCHAR(2048) NOT NULL,
upload_status VARCHAR(50) NOT NULL DEFAULT 'UPLOADING',
workflow_id VARCHAR(255) NOT NULL,
task_id VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_file_metadata_workflow_id ON file_metadata (workflow_id);
CREATE INDEX IF NOT EXISTS idx_file_metadata_task_id ON file_metadata (task_id);
CREATE INDEX IF NOT EXISTS idx_file_metadata_upload_status ON file_metadata (upload_status);
Location: postgres-persistence/src/main/resources/db/migration_postgres/V15__file_metadata.sql
MySQL — V9__file_metadata.sql
CREATE TABLE file_metadata (
file_id VARCHAR(255) NOT NULL,
file_name VARCHAR(1024) NOT NULL,
content_type VARCHAR(255) NOT NULL,
file_size BIGINT NOT NULL,
storage_content_hash VARCHAR(255),
storage_content_size BIGINT,
storage_type VARCHAR(50) NOT NULL,
storage_path VARCHAR(2048) NOT NULL,
upload_status VARCHAR(50) NOT NULL DEFAULT 'UPLOADING',
workflow_id VARCHAR(255) NOT NULL,
task_id VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (file_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE INDEX idx_file_metadata_workflow_id ON file_metadata (workflow_id);
CREATE INDEX idx_file_metadata_task_id ON file_metadata (task_id);
CREATE INDEX idx_file_metadata_upload_status ON file_metadata (upload_status);
Location: mysql-persistence/src/main/resources/db/migration/V9__file_metadata.sql
MySQL difference: ON UPDATE CURRENT_TIMESTAMP for updated_at, ENGINE=InnoDB, explicit PRIMARY KEY clause.
SQLite — V3__file_metadata.sql
CREATE TABLE file_metadata (
file_id VARCHAR(255) NOT NULL PRIMARY KEY,
file_name VARCHAR(1024) NOT NULL,
content_type VARCHAR(255) NOT NULL,
file_size BIGINT NOT NULL,
storage_content_hash VARCHAR(255),
storage_content_size BIGINT,
storage_type VARCHAR(50) NOT NULL,
storage_path VARCHAR(2048) NOT NULL,
upload_status VARCHAR(50) NOT NULL DEFAULT 'UPLOADING',
workflow_id VARCHAR(255) NOT NULL,
task_id VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_file_metadata_workflow_id ON file_metadata (workflow_id);
CREATE INDEX IF NOT EXISTS idx_file_metadata_task_id ON file_metadata (task_id);
CREATE INDEX IF NOT EXISTS idx_file_metadata_upload_status ON file_metadata (upload_status);
Location: sqlite-persistence/src/main/resources/db/migration_sqlite/V3__file_metadata.sql
SQLite difference: no ON UPDATE clause (application updates updated_at); no explicit charset.
Configuration Properties
FileStorageProperties
core module — org.conductoross.conductor.core.storage
package org.conductoross.conductor.core.storage;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.convert.DurationUnit;
import org.springframework.util.unit.DataSize;
@ConfigurationProperties("conductor.file-storage")
public class FileStorageProperties {
/** Feature flag — entire file storage feature gated on this. Disabled by default. */
private boolean enabled = false;
/** Storage backend type: local, s3, azure-blob, gcs */
private String type = "local";
/** Maximum file size allowed */
private DataSize maxFileSize = DataSize.ofGigabytes(5);
/** Presigned URL TTL */
@DurationUnit(ChronoUnit.SECONDS)
private Duration signedUrlExpiration = Duration.ofSeconds(60);
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public DataSize getMaxFileSize() { return maxFileSize; }
public void setMaxFileSize(DataSize maxFileSize) { this.maxFileSize = maxFileSize; }
public Duration getSignedUrlExpiration() { return signedUrlExpiration; }
public void setSignedUrlExpiration(Duration signedUrlExpiration) {
this.signedUrlExpiration = signedUrlExpiration;
}
/**
* When set, any download request presenting this workflowId bypasses family scoping. Useful for
* shared reference files. Default: {@code null} (disabled).
*/
private String defaultWorkflowId = null;
public String getDefaultWorkflowId() { return defaultWorkflowId; }
public void setDefaultWorkflowId(String defaultWorkflowId) {
this.defaultWorkflowId = defaultWorkflowId;
}
}
S3FileStorageProperties
awss3-storage module — com.netflix.conductor.s3.config
@ConfigurationProperties("conductor.file-storage.s3")
public class S3FileStorageProperties {
private String bucketName;
private String region = "us-east-1";
// getters and setters
}
AzureBlobFileStorageProperties
azureblob-storage module
@ConfigurationProperties("conductor.file-storage.azure-blob")
public class AzureBlobFileStorageProperties {
private String containerName;
private String connectionString;
// getters and setters
}
GcsFileStorageProperties
gcs-storage module (new)
@ConfigurationProperties("conductor.file-storage.gcs")
public class GcsFileStorageProperties {
private String bucketName;
private String projectId;
// getters and setters
}
LocalFileStorageProperties
local-file-storage module (new)
@ConfigurationProperties("conductor.file-storage.local")
public class LocalFileStorageProperties {
private String directory = "./conductor-files";
// getter and setter
}
FileClientProperties
SDK-side config. conductor-client — org.conductoross.conductor.client.
package org.conductoross.conductor.client;
import java.nio.file.Path;
public class FileClientProperties {
/** Upload/download retry count */
private int retryCount = 3;
/** Local cache directory for downloaded files — defaults to java.io.tmpdir/conductor/files-cache */
private String localCacheDirectory =
Path.of(System.getProperty("java.io.tmpdir"), "conductor", "files-cache").toString();
/** File size threshold above which multipart upload is used (default: 100 MB) */
private long multipartThreshold = 100 * 1024 * 1024;
// getters and setters
}
Unlike the server-side FileStorageProperties, the SDK does not have a type field — storage type selection
happens automatically: FileClient looks up the registered backend by the StorageType returned in
FileUploadResponse.
SDK Interfaces
All in conductor-client module.
FileHandler
org.conductoross.conductor.sdk.file
Developer-facing. Only interface the developer interacts with for file references.
package org.conductoross.conductor.sdk.file;
import java.io.InputStream;
import java.nio.file.Path;
/**
* Developer-facing file reference. Abstracts storage backend completely.
*
* <p>Use {@link #fromLocalFile(Path)} to wrap a local file for upload.
* Use {@link #getInputStream()} to read file content (lazy download on first call).
*/
public interface FileHandler {
/**
* Returns the file handle in {@code conductor://file/<fileId>} format.
* Returns null for local files that have not been uploaded yet.
*/
String getFileHandleId();
/**
* Returns an InputStream to read the file content.
*
* <p>First call triggers download from storage backend (lazy).
* Subsequent calls return a new InputStream from the cached local file.
* Blocks if a download is already in progress. Applies retry on failure.
*
* @throws FileStorageException if download fails after retries
*/
InputStream getInputStream();
String getFileName();
String getContentType();
long getFileSize();
/**
* Wrap a local file for upload. No network call — no file ID assigned yet.
* Content type defaults to {@code application/octet-stream}.
*/
static FileHandler fromLocalFile(Path path) {
return new LocalFileHandler(path, "application/octet-stream");
}
/** Wrap a local file with explicit content type. */
static FileHandler fromLocalFile(Path path, String contentType) {
return new LocalFileHandler(path, contentType);
}
}
FileUploader
org.conductoross.conductor.sdk.file
Developer-facing upload API.
package org.conductoross.conductor.sdk.file;
import java.io.InputStream;
import java.nio.file.Path;
/**
* Developer-facing upload API. Implemented by {@link WorkflowFileClient}, which binds
* {@code workflowId} at construction — callers do not pass it per-call.
* Obtain via {@code WorkflowFileClient} or Spring injection.
*/
public interface FileUploader {
FileHandler upload(Path localFile);
FileHandler upload(InputStream inputStream);
/** Upload from a local file with the given options. */
FileHandler upload(Path localFile, FileUploadOptions options);
/** Upload from an {@link InputStream} with the given options. */
FileHandler upload(InputStream inputStream, FileUploadOptions options);
}
FileStorageBackend
org.conductoross.conductor.sdk.file
Actual file transfer to/from storage. One implementation per backend.
package org.conductoross.conductor.sdk.file;
import java.io.InputStream;
import java.nio.file.Path;
import org.conductoross.conductor.client.model.file.StorageType;
/**
* Handles actual file transfer to/from storage backends.
* SDK-side counterpart to the server's {@code FileStorage} interface.
*/
public interface FileStorageBackend {
/** Returns the storage type this backend handles. */
StorageType getStorageType();
/** Upload entire file to the given presigned URL. */
void upload(String url, Path localFile);
/** Upload from InputStream to the given presigned URL. */
void upload(String url, InputStream inputStream, long contentLength);
/** Download file from the given presigned URL to local destination. */
void download(String url, Path destination);
/**
* Upload a single part for multipart upload. Returns the part's ETag.
*
* @param url presigned URL for this part (S3) or resumable URL (GCS/Azure)
* @param localFile source file
* @param offset byte offset into the source file
* @param length number of bytes to upload for this part
* @return ETag string for the uploaded part
*/
String uploadPart(String url, Path localFile, long offset, long length);
/** Whether this backend supports multipart upload. Defaults to true. */
default boolean hasMultipartSupport() { return true; }
}
SDK Internal Classes
FileClient
conductor-client — org.conductoross.conductor.client
Does NOT implement FileUploader — callers should use WorkflowFileClient, which binds a workflowId and
delegates here. Composes ConductorClient + a Map<StorageType, FileStorageBackend> — supports multiple
backends registered via the builder, and validates the server-reported storage type against the registered keys.
Follows existing client pattern (TaskClient, WorkflowClient) — constructor takes ConductorClient, makes
REST calls via client.execute(ConductorClientRequest).
package org.conductoross.conductor.client;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import com.netflix.conductor.client.http.ConductorClient;
import org.conductoross.conductor.client.model.file.*;
import org.conductoross.conductor.sdk.file.*;
/**
* Client for the Conductor file-storage REST API.
*
* <p>Does NOT implement {@link FileUploader} — callers should use {@link WorkflowFileClient},
* which binds a {@code workflowId} and delegates here. Composes a {@code ConductorClient} for
* REST calls and a {@code Map<StorageType, FileStorageBackend>} for byte transfer.
*
* <p>REST paths use the bare {@code fileId} as path variables; JSON bodies carry the prefixed
* {@code fileHandleId}.
*/
public class FileClient {
private final ConductorClient client;
private final FileClientProperties properties;
private final Map<StorageType, FileStorageBackend> fileStorageBackendsByStorageType;
/** Defaults: standard FileClientProperties + all four built-in backends (LOCAL/S3/AZURE_BLOB/GCS). */
public FileClient(ConductorClient client) {
this(client, new FileClientProperties(), null);
}
/**
* @param properties {@code null} to use defaults
* @param fileStorageBackendsByStorageType {@code null} to use the built-in backends
*/
public FileClient(ConductorClient client, FileClientProperties properties,
Map<StorageType, FileStorageBackend> fileStorageBackendsByStorageType) {
this.client = client;
this.properties = properties == null ? new FileClientProperties() : properties;
this.fileStorageBackendsByStorageType = fileStorageBackendsByStorageType != null
? fileStorageBackendsByStorageType
: createDefaultFileStorageBackends();
}
// --- Developer-facing uploads (workflowId required) ---
/**
* Uploads a local file. {@code workflowId} is required.
*
* @throws FileStorageException if workflowId is null, backend is missing, or upload fails
*/
public FileHandler upload(String workflowId, Path localFile, FileUploadOptions options) { /* ... */ }
public FileHandler upload(String workflowId, InputStream inputStream, FileUploadOptions options) { /* ... */ }
public FileHandler upload(String workflowId, Path localFile) { /* ... */ }
public FileHandler upload(String workflowId, InputStream inputStream) { /* ... */ }
// --- SDK-internal (used by WorkflowFileClient / ManagedFileHandler) ---
/** Downloads to {@code destination}; fetches a fresh presigned URL on each call. */
public void download(String workflowId, String fileHandleId, StorageType storageType, Path destination) { /* ... */ }
/** Fetches the {@link FileHandle} metadata via {@code GET /api/files/{fileId}}. */
public FileHandle getMetadata(String fileHandleId) { /* ... */ }
public int getRetryCount() { return properties.getRetryCount(); }
public String getCacheDirectory() { return properties.getLocalCacheDirectory(); }
// --- Private helpers ---
private FileUploadResponse createFileOnServer(FileUploadRequest request) { /* ... */ }
/** {@code GET /api/files/{workflowId}/{fileId}/download-url} */
private FileDownloadUrlResponse getDownloadUrl(String workflowId, String fileHandleId) { /* ... */ }
private void confirmUpload(String fileHandleId) { /* ... */ }
private MultipartInitResponse initiateMultipartUpload(String fileHandleId) { /* ... */ }
private String getPartUploadUrl(String fileHandleId, String uploadId, int partNumber) { /* ... */ }
private void completeMultipartUpload(String fileHandleId, String uploadId, List<String> partETags) { /* ... */ }
private void uploadMultipart(String fileHandleId, StorageType storageType, Path localFile) { /* ... */ }
private static Map<StorageType, FileStorageBackend> createDefaultFileStorageBackends() { /* LOCAL, S3, AZURE_BLOB, GCS */ }
// --- Builder ---
public static Builder builder(ConductorClient client) { return new Builder(client); }
public static class Builder {
private final ConductorClient client;
private FileClientProperties properties;
private final Map<StorageType, FileStorageBackend> backends = new java.util.EnumMap<>(StorageType.class);
private Builder(ConductorClient client) { this.client = client; }
public Builder properties(FileClientProperties properties) {
this.properties = properties;
return this;
}
public Builder addStorageBackend(FileStorageBackend backend) {
this.backends.put(backend.getStorageType(), backend);
return this;
}
public FileClient build() {
Map<StorageType, FileStorageBackend> resolved = new java.util.EnumMap<>(createDefaultFileStorageBackends());
resolved.putAll(backends);
return new FileClient(client, properties, Map.copyOf(resolved));
}
}
}
WorkflowFileClient
conductor-client — org.conductoross.conductor.sdk.file
Decorator for FileClient that binds a workflowId at construction, exposing the
FileUploader interface without requiring callers to pass workflowId on each call.
Created by FileClient.upload() after a successful upload (so ManagedFileHandler can
download files scoped to the same workflow), and by AnnotatedWorker when injecting file
access into workers.
package org.conductoross.conductor.sdk.file;
import java.io.InputStream;
import java.nio.file.Path;
import org.conductoross.conductor.client.FileClient;
import org.conductoross.conductor.client.model.file.FileHandle;
import org.conductoross.conductor.client.model.file.StorageType;
public class WorkflowFileClient implements FileUploader {
private final FileClient delegate;
private final String workflowId;
public WorkflowFileClient(FileClient delegate, String workflowId) {
this.delegate = delegate;
this.workflowId = workflowId;
}
@Override public FileHandler upload(Path localFile, FileUploadOptions options) {
return delegate.upload(workflowId, localFile, options);
}
@Override public FileHandler upload(InputStream inputStream, FileUploadOptions options) {
return delegate.upload(workflowId, inputStream, options);
}
@Override public FileHandler upload(Path localFile) {
return delegate.upload(workflowId, localFile);
}
@Override public FileHandler upload(InputStream inputStream) {
return delegate.upload(workflowId, inputStream);
}
public void download(String fileHandleId, StorageType storageType, Path destination) {
delegate.download(workflowId, fileHandleId, storageType, destination);
}
public FileHandle getMetadata(String fileHandleId) {
return delegate.getMetadata(fileHandleId);
}
public int getRetryCount() { return delegate.getRetryCount(); }
public String getCacheDirectory() { return delegate.getCacheDirectory(); }
}
FileUploadOptions
conductor-client — org.conductoross.conductor.sdk.file
Optional metadata for a file upload. workflowId is passed as the explicit first argument
on FileClient.upload() — this type carries everything else. When uploading inside a worker,
taskId is auto-filled from the active TaskContext if not set here.
package org.conductoross.conductor.sdk.file;
public class FileUploadOptions {
private String taskId;
private String fileName;
private String contentType;
private long fileSize;
// fluent setters + getters
public FileUploadOptions setTaskId(String taskId) { this.taskId = taskId; return this; }
public FileUploadOptions setFileName(String fileName) { this.fileName = fileName; return this; }
public FileUploadOptions setContentType(String contentType) { this.contentType = contentType; return this; }
public FileUploadOptions setFileSize(long fileSize) { this.fileSize = fileSize; return this; }
// getters ...
}
All fields default to null/0 — FileClient.upload() fills in fileName from the path,
fileSize from Files.size(), and contentType from "application/octet-stream" when
the caller does not set them.
FileStorageException
conductor-client — org.conductoross.conductor.sdk.file
SDK-side unchecked exception for all file storage errors.
package org.conductoross.conductor.sdk.file;
public class FileStorageException extends RuntimeException {
public FileStorageException(String message) { super(message); }
public FileStorageException(String message, Throwable cause) { super(message, cause); }
}
LocalFileHandler
conductor-client — org.conductoross.conductor.sdk.file (package-private)
package org.conductoross.conductor.sdk.file;
import java.io.*;
import java.nio.file.*;
/**
* Wraps a local file before upload. No file handle ID, no network calls.
* Created by {@link FileHandler#fromLocalFile(Path)}.
*/
class LocalFileHandler implements FileHandler {
private final Path path;
private final String contentType;
LocalFileHandler(Path path, String contentType) {
this.path = path;
this.contentType = contentType;
}
@Override
public String getFileHandleId() { return null; }
@Override
public InputStream getInputStream() {
try {
return Files.newInputStream(path);
} catch (IOException e) {
throw new FileStorageException("Failed to open local file: " + path, e);
}
}
@Override
public String getFileName() { return path.getFileName().toString(); }
@Override
public String getContentType() { return contentType; }
@Override
public long getFileSize() {
try {
return Files.size(path);
} catch (IOException e) {
throw new FileStorageException("Failed to get file size: " + path, e);
}
}
Path getPath() { return path; }
}
ManagedFileHandler
conductor-client — org.conductoross.conductor.sdk.file (package-private)
package org.conductoross.conductor.sdk.file;
import java.io.*;
import java.nio.file.*;
import java.util.concurrent.locks.ReentrantLock;
import org.conductoross.conductor.client.model.file.*;
/**
* SDK-internal FileHandler for files with a server-assigned conductor://file/<fileId>.
* Handles lazy download, local file caching, and retry.
* Thread-safe via ReentrantLock.
*/
class ManagedFileHandler implements FileHandler {
private final String fileHandleId;
private String fileName;
private String contentType;
private long fileSize;
private StorageType storageType;
private Path localPath;
private FileDownloadStatus downloadStatus = FileDownloadStatus.NOT_STARTED;
private final WorkflowFileClient workflowFileClient;
private final ReentrantLock downloadLock = new ReentrantLock();
ManagedFileHandler(String fileHandleId, WorkflowFileClient workflowFileClient) {
this.fileHandleId = fileHandleId;
this.workflowFileClient = workflowFileClient;
}
@Override
public String getFileHandleId() { return fileHandleId; }
@Override
public InputStream getInputStream() {
ensureDownloaded();
try {
return Files.newInputStream(localPath);
} catch (IOException e) {
throw new FileStorageException("Failed to open cached file: " + localPath, e);
}
}
@Override
public String getFileName() { ensureMetadataLoaded(); return fileName; }
@Override
public String getContentType() { ensureMetadataLoaded(); return contentType; }
@Override
public long getFileSize() { ensureMetadataLoaded(); return fileSize; }
/** Lazy metadata fetch — only calls server on first access. */
private void ensureMetadataLoaded() {
if (fileName == null) {
FileHandle metadata = workflowFileClient.getMetadata(fileHandleId);
this.fileName = metadata.getFileName();
this.contentType = metadata.getContentType();
this.fileSize = metadata.getFileSize();
this.storageType = metadata.getStorageType();
}
}
/** Lazy download — first call downloads and caches; subsequent calls return cached file. */
private void ensureDownloaded() {
if (downloadStatus == FileDownloadStatus.DOWNLOADED && localPath != null
&& Files.exists(localPath)) {
return;
}
downloadLock.lock();
try {
if (downloadStatus == FileDownloadStatus.DOWNLOADED && localPath != null
&& Files.exists(localPath)) {
return;
}
downloadStatus = FileDownloadStatus.DOWNLOADING;
ensureMetadataLoaded();
Path destination = getCachePath();
if (Files.exists(destination)) {
this.localPath = destination;
this.downloadStatus = FileDownloadStatus.DOWNLOADED;
return;
}
int maxRetries = workflowFileClient.getRetryCount();
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
workflowFileClient.download(fileHandleId, storageType, destination);
this.localPath = destination;
this.downloadStatus = FileDownloadStatus.DOWNLOADED;
return;
} catch (Exception e) {
if (attempt == maxRetries) throw e;
}
}
} catch (Exception e) {
this.downloadStatus = FileDownloadStatus.FAILED;
throw new FileStorageException("Download failed for " + fileHandleId, e);
} finally {
downloadLock.unlock();
}
}
private Path getCachePath() {
// Predictable path based on file ID — enables cross-task reuse on same worker node
// Format: <cacheDir>/<fileId>_<fileName>
// fileId extracted by stripping the conductor://file/ prefix
String fileId = FileIdToFileHandleIdConverter.toFileId(fileHandleId);
return Path.of(workflowFileClient.getCacheDirectory(), fileId + "_" + fileName);
}
}
Design Diagrams
Storage Handshaking — Server ↔ SDK ↔ Storage Backend
sequenceDiagram
participant Dev as Developer
participant SDK as SDK (FileClient)
participant Server as Conductor Server
participant Storage as Storage Backend
Dev->>SDK: fileUploader.upload(path)
SDK->>Server: POST /api/files (metadata)
Server->>Server: Generate conductor://file/<fileId>, storage path
Server->>Storage: Generate presigned upload URL
Storage-->>Server: Presigned URL
Server-->>SDK: FileUploadResponse (fileHandleId + uploadUrl)
SDK->>Storage: PUT file content (via presigned URL)
Storage-->>SDK: 200 OK
SDK->>Server: POST /api/files/{fileId}/upload-complete
Server->>Storage: Verify file exists
Storage-->>Server: Confirmed
Server-->>SDK: UPLOADED
SDK-->>Dev: FileHandler (conductor://file/<fileId>)
File Upload — Single Part
sequenceDiagram
participant W as Worker
participant FC as FileClient
participant S as Server
participant SB as FileStorageBackend
participant ST as Storage
W->>W: FileHandler.fromLocalFile(path)
Note over W: LocalFileHandler created
W->>W: Task completes (or explicit upload)
W->>FC: upload(localFile)
FC->>S: POST /api/files {fileName, fileSize, ...}
S-->>FC: {fileHandleId, uploadUrl, uploadUrlExpiresAt}
FC->>SB: upload(uploadUrl, localFile)
SB->>ST: HTTP PUT (file bytes)
ST-->>SB: 200 OK
FC->>S: POST /api/files/{fileId}/upload-complete
S->>ST: getStorageFileInfo(storagePath)
ST-->>S: true
S-->>FC: {uploadStatus: UPLOADED}
FC-->>W: ManagedFileHandler (conductor://file/<fileId>)
File Upload — Multipart (S3 vs GCS/Azure)
sequenceDiagram
participant FC as FileClient
participant S as Server
participant SB as FileStorageBackend
participant ST as Storage
FC->>S: POST /api/files/{fileId}/multipart
S->>ST: initiateMultipartUpload
ST-->>S: uploadId
S-->>FC: {uploadId, uploadUrl, partSize}
alt S3 (uploadUrl is null)
loop Each part
FC->>S: GET .../part/{partNumber}
S->>ST: generatePartUploadUrl
ST-->>S: per-part presigned URL
S-->>FC: {uploadUrl, expiresAt}
FC->>SB: uploadPart(url, file, offset, length)
SB->>ST: HTTP PUT (part bytes)
ST-->>SB: ETag
end
else GCS / Azure (uploadUrl is resumable)
loop Each part
FC->>SB: uploadPart(resumableUrl, file, offset, length)
SB->>ST: PUT with byte-range / block ID
ST-->>SB: ETag
end
end
FC->>S: POST .../multipart/{uploadId}/complete {partETags}
S->>ST: completeMultipartUpload
S-->>FC: {uploadStatus: UPLOADED}
File Download
sequenceDiagram
participant W as Worker
participant MFH as ManagedFileHandler
participant FC as FileClient
participant S as Server
participant SB as FileStorageBackend
participant ST as Storage
W->>MFH: getInputStream()
MFH->>MFH: acquire downloadLock
MFH->>FC: getMetadata(fileId)
FC->>S: GET /api/files/{fileId}
S-->>FC: FileHandle {storageType, ...}
FC-->>MFH: metadata
MFH->>MFH: validate storageType match
MFH->>FC: download(workflowId, fileId, tempPath)
Note over FC: Internally fetches download URL
FC->>S: GET /api/files/{workflowId}/{fileId}/download-url
S-->>FC: {downloadUrl, expiresAt}
FC->>SB: download(downloadUrl, tempPath)
SB->>ST: HTTP GET
ST-->>SB: file bytes
SB-->>FC: saved to tempPath
FC-->>MFH: download complete
MFH->>MFH: localPath = tempPath, status = DOWNLOADED
MFH->>MFH: release downloadLock
MFH-->>W: InputStream (from cached file)
Note over W,MFH: Subsequent getInputStream() calls<br/>skip download, return new InputStream<br/>from cached localPath
Multipart download: No server-side multipart coordination for downloads. FileStorageBackend.download()
handles chunked/range-based downloading internally as an implementation detail (e.g. HTTP range requests for
large files). No separate download-part endpoint needed.
Upload Status State Machine
stateDiagram-v2
[*] --> UPLOADING : POST /api/files (URL generated)
UPLOADING --> UPLOADED : POST .../upload-complete (verified)
UPLOADING --> FAILED : Upload error / retry exhausted
FAILED --> UPLOADING : Retry (new presigned URL)
note right of UPLOADING : Persisted on server + worker
note right of UPLOADED : Immutable terminal state
note right of FAILED : SDK may retry → back to UPLOADING
PENDING is defined on the enum for future use (e.g. pre-initialized records); the runtime flow enters
UPLOADING on file creation via POST /api/files.
Download Status State Machine
stateDiagram-v2
[*] --> NOT_STARTED : ManagedFileHandler created
NOT_STARTED --> DOWNLOADING : getInputStream() called
DOWNLOADING --> DOWNLOADED : File cached locally
DOWNLOADING --> FAILED : Download error / retry exhausted
FAILED --> DOWNLOADING : Retry (new presigned URL)
note right of NOT_STARTED : Worker node only
note right of DOWNLOADED : Cached — no re-download
Retry on Upload Failure
flowchart TD
A[Upload attempt] --> B{Success?}
B -- Yes --> C[POST upload-complete]
B -- No --> D{Retries left?}
D -- No --> E[Status → FAILED<br/>Throw exception]
D -- Yes --> F{Presigned URL<br/>expired?}
F -- No --> A
F -- Yes --> G[GET /upload-url<br/>→ fresh URL]
G --> A
Automatic Upload — TaskRunner
sequenceDiagram
participant TR as TaskRunner
participant W as Worker
participant FC as FileClient
participant TC as TaskClient
TR->>W: worker.execute(task)
W-->>TR: TaskResult (outputData may contain FileHandler)
TR->>TR: Scan outputData entries
loop Each entry where value instanceof FileHandler with null fileHandleId
TR->>FC: upload(fileHandler)
FC-->>TR: ManagedFileHandler (conductor://file/<fileId>)
TR->>TR: entry.setValue(uploadedFileHandler)
end
TR->>TC: updateTask(taskResult)
Note over TR: outputData FileHandler entries are serialized via<br/>FileHandlerSerializer → {fileHandleId, contentType, fileName}
What-If Scenario Answers
1. Crash during file upload
What happens: The conductor://file/<fileId> and FileModel already exist on the server (created by POST /api/files).
Status remains UPLOADING. The file may or may not exist on the storage backend (partial upload).
Detection: Server has a record with status UPLOADING indefinitely. Background audit detects stale UPLOADING records after a configurable threshold.
System behavior: On task retry, the SDK reuses the same conductor://file/<fileId>. It calls GET /upload-url to get a
fresh presigned URL and restarts the upload from the beginning. The old partial upload on storage (if any) is
orphaned — the new upload overwrites the same storage path.
Developer sees: Transparent retry. If all retries exhausted, task fails with FileStorageException.
2. Temp file missing from disk
What happens: The cached local file (after download) has been deleted by the developer, OS, or disk failure.
System behavior: On next getInputStream(), ManagedFileHandler attempts to open the cached file → fails.
Re-download is triggered: new presigned URL → download to new temp file → update localPath.
Developer sees: Slight delay on next getInputStream() call. Transparent to the developer — the file is
re-downloaded automatically.
3. File never fully uploaded
What happens: Server has UPLOADING status but upload never completed. No upload-complete was called.
Detection: Background audit workflow scans for UPLOADING records older than a configurable threshold and marks them FAILED.
Downstream task sees: When a downstream task calls getInputStream(), the SDK fetches metadata and sees status
is not UPLOADED → throws FileStorageException("File not yet uploaded: conductor://file/<fileId>, status=UPLOADING").
4. File handler reuse on same worker
What happens: Same conductor://file/<fileId> used by two tasks on the same worker node.
System behavior: Each task execution creates its own ManagedFileHandler instance. However, the local file
cache uses a predictable path based on the file ID (<cacheDir>/<fileId>_<fileName>). If the first task already
downloaded the file, the second task's ManagedFileHandler checks if the file exists at the expected cache path →
skips download.
Developer sees: Second task gets the file instantly — no network call.
5. File access across workflow boundaries
What happens: Workflow A uploads a file → conductor://file/d6a4e5f7. A worker in Workflow B tries to download it via GET /api/files/{workflowBId}/{fileId}/download-url.
System behavior: The server checks whether Workflow B is in Workflow A's family (self, ancestors, or descendants via WorkflowFamilyResolver). If not, AccessForbiddenException → 403. If B is a sub-workflow of A (or vice versa), access is granted.
Exception: If conductor.file-storage.default-workflow-id is configured and the caller presents that ID, the family check is skipped — useful for shared reference files.
Developer sees: 403 if the workflows are unrelated. Pass files between workflows only through parent/child relationships, or use defaultWorkflowId for truly shared files.
6. Presigned URL expires during multipart upload
What happens: Mid-upload, the presigned URL for a part expires (TTL exceeded).
System behavior per backend:
- S3: SDK detects 403/expired error. Requests new per-part presigned URL via
GET .../multipart/{uploadId}/part/{partNumber}. Retries the failed part.
- GCS/Azure: Resumable session URLs typically have longer TTL (hours). If expired, SDK calls
POST .../multipart to initiate a new multipart upload and restarts.
Developer sees: Transparent retry. Configurable retry count (default 3).
7. Storage backend mismatch
What happens: Server configured for S3, SDK configured for local (or vice versa).
Detection: On first download — ManagedFileHandler.ensureDownloaded() fetches metadata; FileClient.download()
looks up the backend in the SDK's registered storage backends map by the server-reported StorageType and
throws if no backend is registered for that type.
Developer sees: Immediate FileStorageException("Server uses S3 but sdk only supports: [LOCAL]"). Fail fast
with clear error message — no silent data corruption.
8. Concurrent uploads to same file ID
What happens: Two workers try to upload to the same conductor://file/<fileId>. This should not happen in normal workflow
execution (each file ID is generated once).
System behavior: First upload completes → upload-complete → status UPLOADED. Second upload's
upload-complete call → 409 ALREADY_UPLOADED. The SDK receives the 409, recognizes the file is already uploaded,
and returns the existing ManagedFileHandler.
Developer sees: No error. Both workers end up with a valid FileHandler pointing to the same file.
9. Worker restart mid-download
What happens: Worker crashes during file download. The task is retried on restart.
System behavior: On restart, a new ManagedFileHandler is created for the same conductor://file/<fileId>. The partially
downloaded temp file is incomplete. The new handler creates a fresh temp file, downloads from scratch via a new
presigned URL. The old partial temp file is orphaned (never cleaned up per constraint).
Developer sees: Transparent — the retried task downloads the file normally.
10. File size exceeds limit after partial upload
What happens: File is larger than declared in FileUploadRequest.fileSize. The actual bytes uploaded exceed
the declared size.
System behavior: For cloud backends, the presigned URL may enforce content-length (S3 does). The upload
fails with a 400 from the storage backend. For local backend, the SDK writes directly to the local directory.
In both cases, upload-complete verification via getStorageFileInfo() catches size mismatches — server reads
actual size from storage and compares against declared fileSize.
Developer sees: FileStorageException("Upload failed: file size mismatch"). Status remains UPLOADING →
eventually marked FAILED by audit.
Server Implementation
Converters (Server)
core module — org.conductoross.conductor.core.storage.converter
All object-to-object transformations go through converters — no inline field copying.
FileModelConverter
package org.conductoross.conductor.core.storage.converter;
import java.time.Instant;
import org.conductoross.conductor.model.file.*;
import org.conductoross.conductor.model.FileModel;
public class FileModelConverter {
/** FileUploadRequest → FileModel */
public static FileModel toFileModel(FileUploadRequest request, String fileId,
String storagePath, StorageType storageType) {
FileModel model = new FileModel();
model.setFileId(fileId);
model.setFileName(request.getFileName());
model.setContentType(request.getContentType());
model.setFileSize(request.getFileSize());
model.setStorageType(storageType);
model.setStoragePath(storagePath);
model.setUploadStatus(FileUploadStatus.UPLOADING);
model.setWorkflowId(request.getWorkflowId());
model.setTaskId(request.getTaskId());
model.setCreatedAt(Instant.now());
model.setUpdatedAt(Instant.now());
return model;
}
/** FileModel → FileHandle (API response DTO) */
public static FileHandle toFileHandle(FileModel model) {
FileHandle handle = new FileHandle();
handle.setFileHandleId(model.getFileId());
handle.setFileName(model.getFileName());
handle.setContentType(model.getContentType());
handle.setFileSize(model.getFileSize());
handle.setContentHash(model.getStorageContentHash());
handle.setStorageType(model.getStorageType());
handle.setUploadStatus(model.getUploadStatus());
handle.setWorkflowId(model.getWorkflowId());
handle.setTaskId(model.getTaskId());
handle.setCreatedAt(model.getCreatedAt().toEpochMilli());
handle.setUpdatedAt(model.getUpdatedAt().toEpochMilli());
return handle;
}
/** FileModel + upload URL → FileUploadResponse */
public static FileUploadResponse toFileUploadResponse(FileModel model, String uploadUrl,
Instant uploadUrlExpiresAt) {
FileUploadResponse response = new FileUploadResponse();
response.setFileHandleId(model.getFileId());
response.setFileName(model.getFileName());
response.setContentType(model.getContentType());
response.setFileSize(model.getFileSize());
response.setStorageType(model.getStorageType());
response.setUploadStatus(model.getUploadStatus());
response.setUploadUrl(uploadUrl);
response.setUploadUrlExpiresAt(uploadUrlExpiresAt.toEpochMilli());
response.setCreatedAt(model.getCreatedAt().toEpochMilli());
return response;
}
}
Note: FileModel retains Instant createdAt/updatedAt internally; the converter maps to epoch millis when
populating DTOs. DTO setters for fileHandleId write the prefixed handle string (the model still stores it
in fileId internally).
FileStorageServiceImpl
core module — org.conductoross.conductor.core.storage
package org.conductoross.conductor.core.storage;
import java.time.Instant;
import java.util.*;
import org.springframework.stereotype.Service;
import org.conductoross.conductor.model.file.*;
import org.conductoross.conductor.core.storage.converter.FileModelConverter;
import org.conductoross.conductor.dao.FileMetadataDAO;
import org.conductoross.conductor.model.FileModel;
@Service
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
public class FileStorageServiceImpl implements FileStorageService {
private final FileStorage fileStorage;
private final FileMetadataDAO fileMetadataDAO;
private final FileStorageProperties properties;
private final WorkflowFamilyResolver workflowFamilyResolver;
public FileStorageServiceImpl(
FileStorage fileStorage,
FileMetadataDAO fileMetadataDAO,
FileStorageProperties properties,
WorkflowFamilyResolver workflowFamilyResolver) {
this.fileStorage = fileStorage;
this.fileMetadataDAO = fileMetadataDAO;
this.properties = properties;
this.workflowFamilyResolver = workflowFamilyResolver;
}
@Override
public FileUploadResponse createFile(FileUploadRequest request) {
validateFileSize(request.getFileSize());
String fileId = generateFileId();
String storagePath = "files/%s/%s".formatted(fileId, request.getFileName());
FileModel model = FileModelConverter.toFileModel(
request, fileId, storagePath, fileStorage.getStorageType());
fileMetadataDAO.createFileMetadata(model);
String uploadUrl = fileStorage.generateUploadUrl(
storagePath, properties.getSignedUrlExpiration());
long expiresAt = Instant.now().plus(properties.getSignedUrlExpiration()).toEpochMilli();
// FileModelConverter wraps the bare fileId as conductor://file/<fileId>
// on the response via FileIdToFileHandleIdConverter.
return FileModelConverter.toFileUploadResponse(model, uploadUrl, expiresAt);
}
@Override
public FileHandle getFileMetadata(String fileId) {
FileModel model = fileMetadataDAO.getFileMetadata(fileId);
return FileModelConverter.toFileHandle(model);
}
// ... remaining methods follow same pattern
}
Key behaviors:
- createFile() — validates size, generates fileId, converts to model, persists, converts to response
- getUploadUrl() — generates fresh URL on each call
- confirmUpload() — single call to getStorageFileInfo(), persists hash + size + status via updateUploadComplete()
- getDownloadUrl() — verifies status is UPLOADED; checks file has a workflowId; if caller's workflowId equals defaultWorkflowId skips family check; otherwise resolves family via WorkflowFamilyResolver.getFamily(callerWorkflowId) and throws AccessForbiddenException if file's workflowId not in family
- getFileMetadata() — DAO lookup, converts via FileModelConverter.toFileHandle()
- initiateMultipartUpload() — delegates to FileStorage, returns upload ID + URL + part size
- getPartUploadUrl() — delegates to FileStorage.generatePartUploadUrl()
- completeMultipartUpload() — delegates to FileStorage, verifies via getStorageFileInfo(), persists hash + size + status via updateUploadComplete()
No presigned URL caching — server generates a fresh URL on every request. Keeps the server stateless and avoids cache invalidation complexity.
Exception Handling
Reuses existing Conductor exception hierarchy in com.netflix.conductor.core.exception — no new exception
classes. Exceptions are mapped to HTTP status codes via the existing ApplicationExceptionMapper
(@RestControllerAdvice).
| Exception | HTTP Status | Usage |
|---|---|---|
NotFoundException |
404 | File ID not found |
ConflictException |
409 | File already uploaded (duplicate upload-complete) |
IllegalArgumentException |
400 | Invalid request, bad file ID format, upload not complete |
NonTransientException |
500 | Storage backend error, verification failed |
AccessForbiddenException |
403 | Caller workflow not in file's workflow family |
FileStorageServiceImpl throws these directly — they flow through ApplicationExceptionMapper to
ErrorResponse DTO (status, message, instance, retryable flag). No @ResponseStatus on controller
methods — the existing @RestControllerAdvice handles all error mapping.
For file size exceeded (413), add one entry to ApplicationExceptionMapper:
// In ApplicationExceptionMapper static block:
EXCEPTION_STATUS_MAP.put(FileTooLargeException.class, HttpStatus.PAYLOAD_TOO_LARGE);
FileTooLargeException extends NonTransientException — the only new exception class, needed because
no existing exception maps to 413.
package org.conductoross.conductor.core.exception;
import com.netflix.conductor.core.exception.NonTransientException;
public class FileTooLargeException extends NonTransientException {
public FileTooLargeException(String message) { super(message); }
}
FileResource Controller
rest module — org.conductoross.conductor.controllers
Follows existing controller pattern: constructor injection, @Operation annotations, delegates to service.
No @ResponseStatus — exceptions flow through ApplicationExceptionMapper.
Gated by @ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true") — when
disabled, controller is not registered and all /api/files endpoints return 404.
Uses the FILES constant from com.netflix.conductor.rest.config.RequestMappingConstants (value /api/files)
to match the framework's request-mapping convention.
package org.conductoross.conductor.controllers;
import java.util.List;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import org.conductoross.conductor.model.file.*;
import org.conductoross.conductor.core.storage.FileStorageService;
import static com.netflix.conductor.rest.config.RequestMappingConstants.FILES;
import io.swagger.v3.oas.annotations.Operation;
@RestController
@RequestMapping(FILES)
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
public class FileResource {
private final FileStorageService fileStorageService;
public FileResource(FileStorageService fileStorageService) {
this.fileStorageService = fileStorageService;
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
@Operation(summary = "Create a file record and get upload URL")
public FileUploadResponse createFile(@RequestBody FileUploadRequest request) {
return fileStorageService.createFile(request);
}
@GetMapping("/{fileId}/upload-url")
@Operation(summary = "Get presigned upload URL (retry/renewal)")
public FileUploadUrlResponse getUploadUrl(@PathVariable String fileId) {
return fileStorageService.getUploadUrl(fileId);
}
@PostMapping("/{fileId}/upload-complete")
@Operation(summary = "Confirm file upload completion")
public FileUploadCompleteResponse confirmUpload(@PathVariable String fileId) {
return fileStorageService.confirmUpload(fileId);
}
@GetMapping("/{workflowId}/{fileId}/download-url")
@Operation(summary = "Get presigned download URL")
public FileDownloadUrlResponse getDownloadUrl(
@PathVariable("workflowId") String workflowId, @PathVariable("fileId") String fileId) {
return fileStorageService.getDownloadUrl(fileId, workflowId);
}
@GetMapping("/{fileId}")
@Operation(summary = "Get file metadata")
public FileHandle getFileMetadata(@PathVariable String fileId) {
return fileStorageService.getFileMetadata(fileId);
}
@PostMapping("/{fileId}/multipart")
@Operation(summary = "Initiate multipart upload")
public MultipartInitResponse initiateMultipartUpload(@PathVariable String fileId) {
return fileStorageService.initiateMultipartUpload(fileId);
}
@GetMapping("/{fileId}/multipart/{uploadId}/part/{partNumber}")
@Operation(summary = "Get presigned URL for a multipart upload part (S3 only)")
public FileUploadUrlResponse getPartUploadUrl(
@PathVariable String fileId,
@PathVariable String uploadId,
@PathVariable int partNumber) {
return fileStorageService.getPartUploadUrl(fileId, uploadId, partNumber);
}
@PostMapping("/{fileId}/multipart/{uploadId}/complete")
@Operation(summary = "Complete multipart upload")
public FileUploadCompleteResponse completeMultipartUpload(
@PathVariable String fileId,
@PathVariable String uploadId,
@RequestBody MultipartCompleteRequest request) {
return fileStorageService.completeMultipartUpload(
fileId, uploadId, request.getPartETags());
}
}
Backend Implementations
Build system is Gradle. New modules (gcs-storage, local-file-storage) need:
- build.gradle with dependency on conductor-core
- Entry in root settings.gradle
All backend beans require both conditions:
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true") and
@ConditionalOnProperty(name = "conductor.file-storage.type", havingValue = "...") — no storage beans
created when the feature is disabled.
S3FileStorage
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
@ConditionalOnProperty(name = "conductor.file-storage.type", havingValue = "s3")
generateUploadUrl()—S3Presigner.presignPutObject()with bucket + key + expirationgenerateDownloadUrl()—S3Presigner.presignGetObject()getStorageFileInfo()—S3Client.headObject()→ returnsStorageFileInfo(exists, ETag, contentLength)initiateMultipartUpload()—S3Client.createMultipartUpload()→ returns upload IDgeneratePartUploadUrl()—S3Presigner.presignUploadPart()with upload ID + part numbercompleteMultipartUpload()—S3Client.completeMultipartUpload()with part ETags- Reuses
S3ClientandS3Presignerbeans (creates own if not present via@ConditionalOnMissingBean)
AzureBlobFileStorage
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
@ConditionalOnProperty(name = "conductor.file-storage.type", havingValue = "azure-blob")
generateUploadUrl()/generateDownloadUrl()— SAS token generation viaBlobServiceClientinitiateMultipartUpload()— returns container URL with SAS token (resumable). Upload ID is a generated correlation key.generatePartUploadUrl()— returns same SAS URL (Azure uses block IDs, not per-part URLs)completeMultipartUpload()—BlockBlobClient.commitBlockList()
GcsFileStorage
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
@ConditionalOnProperty(name = "conductor.file-storage.type", havingValue = "gcs")
New module: gcs-storage.
generateUploadUrl()/generateDownloadUrl()— signed URLs viaStorage.signUrl()initiateMultipartUpload()— initiates resumable upload via GCS JSON API → returns resumable session URIgeneratePartUploadUrl()— returns same resumable URIcompleteMultipartUpload()— finalizes resumable upload
LocalFileStorage
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true")
@ConditionalOnProperty(name = "conductor.file-storage.type", havingValue = "local")
generateUploadUrl()— returns the local storage path (e.g.files/<fileId>/<fileName>). SDK'sLocalFileStorageBackendresolves this against the configured directory and writes directly — server never receives file content.generateDownloadUrl()— returns the local storage path. SDK reads directly from local disk.getStorageFileInfo()—Files.exists(path)→ returnsStorageFileInfo(exists, null, fileSize)- Multipart methods — not applicable (local files written directly).
initiateMultipartUpload()returns a no-op upload ID;completeMultipartUpload()is a no-op.
DAO Implementations
Follow existing Conductor DAO pattern: extend base DAO class, use raw SQL with queryWithTransaction(),
inject RetryTemplate for transient failures, use ObjectMapper for JSON serialization.
All DAO beans additionally require
@ConditionalOnProperty(name = "conductor.file-storage.enabled", havingValue = "true") — no DAO beans
created when the feature is disabled.
PostgresFileMetadataDAO
Extends PostgresBaseDAO. Uses queryWithTransaction() for all operations.
package org.conductoross.conductor.postgres.dao;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.conductoross.conductor.dao.FileMetadataDAO;
import org.conductoross.conductor.model.FileModel;
import com.netflix.conductor.postgres.dao.PostgresBaseDAO;
import org.springframework.retry.support.RetryTemplate;
import javax.sql.DataSource;
public class PostgresFileMetadataDAO extends PostgresBaseDAO implements FileMetadataDAO {
public PostgresFileMetadataDAO(
RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
super(retryTemplate, objectMapper, dataSource);
}
private static final String INSERT_FILE =
"INSERT INTO file_metadata (file_id, file_name, content_type, file_size, "
+ "storage_type, storage_path, upload_status, workflow_id, task_id, "
+ "created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private static final String SELECT_FILE_BY_ID =
"SELECT * FROM file_metadata WHERE file_id = ?";
private static final String UPDATE_STATUS =
"UPDATE file_metadata SET upload_status = ?, updated_at = CURRENT_TIMESTAMP "
+ "WHERE file_id = ?";
private static final String UPDATE_UPLOAD_COMPLETE =
"UPDATE file_metadata SET upload_status = ?, storage_content_hash = ?, "
+ "storage_content_size = ?, updated_at = CURRENT_TIMESTAMP WHERE file_id = ?";
@Override
public void createFileMetadata(FileModel fileModel) {
queryWithTransaction(INSERT_FILE, q -> q
.addParameter(fileModel.getFileId())
.addParameter(fileModel.getFileName())
// ... remaining parameters
.executeUpdate());
}
@Override
public FileModel getFileMetadata(String fileId) {
return queryWithTransaction(SELECT_FILE_BY_ID,
q -> q.addParameter(fileId)
.executeAndFetchFirst(this::toFileModel));
}
// ... remaining methods follow same pattern
}
MySQLFileMetadataDAO
Same pattern as Postgres. Extends MySQLBaseDAO, identical SQL/JSON patterns. Bean in MySQLConfiguration
with @DependsOn({"flyway", "flywayInitializer"}).
SqliteFileMetadataDAO
Follows SQLite's facade/delegate pattern (same as SqliteMetadataDAO).
SqliteFileMetadataDAO— facade implementingFileMetadataDAO, delegates to sub-DAOSqliteFileMetadataSubDAO— extendsSqliteBaseDAO, contains actual SQL logic
Bean in SqliteConfiguration with @DependsOn({"flywayForPrimaryDb"}).
Migration: sqlite-persistence/src/main/resources/db/migration_sqlite/V<next>__file_metadata.sql
RedisFileMetadataDAO
@Component + @Conditional(AnyRedisCondition.class) on DAO class directly (not in a Configuration class).
Extends BaseDynoDAO.
@Component
@Conditional(AnyRedisCondition.class)
public class RedisFileMetadataDAO extends BaseDynoDAO implements FileMetadataDAO {
// Hash key: FILE_METADATA — hset(nsKey(FILE_METADATA), fileId, jsonString)
// Retrieve: hget(nsKey(FILE_METADATA), fileId)
// By workflow: maintain secondary index set WORKFLOW_FILES:{workflowId}
// By task: maintain secondary index set TASK_FILES:{taskId}
}
No migration — hash keys created on first write. JSON serialization via toJson()/readValue() from
BaseDynoDAO.
CassandraFileMetadataDAO
Extends CassandraBaseDAO. Bean in CassandraConfiguration.
public class CassandraFileMetadataDAO extends CassandraBaseDAO implements FileMetadataDAO {
// Table: file_metadata (file_id TEXT PRIMARY KEY, json_data TEXT)
// Prepared statements compiled at construction
// Consistency levels from CassandraProperties
}
Table creation via CQL — follows existing Cassandra module pattern for schema setup.
Background Audit
Detects orphaned files — UPLOADING status never completed.
- Scheduled task or background workflow scans
file_metadatafor records withupload_status = 'UPLOADING'older than a configurable threshold (e.g. 24 hours) - Marks stale records as FAILED via
FileMetadataDAO.updateUploadStatus() - Requires DAO query:
getStaleUploads(Instant olderThan)— not in initialFileMetadataDAOinterface, added when audit is implemented - Storage usage tracking:
SELECT SUM(file_size) FROM file_metadata WHERE workflow_id = ?— query-based, no new interface method needed initially
Not part of initial implementation — captured here for completeness. Spec's Audit & Tracking section defines the requirement.
Java SDK Implementation
Converters (SDK)
conductor-client — org.conductoross.conductor.sdk.file (flat package, not .converter)
FileHandlerConverter
package org.conductoross.conductor.sdk.file;
import java.nio.file.Path;
import org.conductoross.conductor.client.model.file.*;
/** Conversion helpers between the file-storage DTOs and the internal {@link ManagedFileHandler}. */
public class FileHandlerConverter {
/**
* Builds a {@link ManagedFileHandler} from a completed upload response. The handler already
* has its metadata and {@code localPath} populated — no server round-trip needed on first access.
*/
public static ManagedFileHandler toManagedFileHandler(FileUploadResponse response,
WorkflowFileClient workflowFileClient, Path localPath) {
ManagedFileHandler handler = new ManagedFileHandler(response.getFileHandleId(), workflowFileClient);
handler.setFileName(response.getFileName());
handler.setContentType(response.getContentType());
handler.setFileSize(response.getFileSize());
handler.setStorageType(response.getStorageType());
handler.setLocalPath(localPath);
return handler;
}
/** Builds the {@link FileUploadRequest} payload for {@code POST /api/files}. */
public static FileUploadRequest toFileUploadRequest(String workflowId, FileUploadOptions options) {
FileUploadRequest request = new FileUploadRequest();
request.setFileName(options.getFileName());
request.setContentType(options.getContentType());
request.setFileSize(options.getFileSize());
request.setWorkflowId(workflowId);
request.setTaskId(options.getTaskId());
return request;
}
}
Upload Flow — FileClient.upload(String workflowId, Path, FileUploadOptions)
public FileHandler upload(String workflowId, Path localFile, FileUploadOptions options) {
if (workflowId == null) {
throw new FileStorageException("workflowId is required");
}
// 1. Fill in missing options from the local file and active TaskContext
fillDefaults(options, localFile);
// 2. Convert to FileUploadRequest (includes workflowId)
FileUploadRequest request = FileHandlerConverter.toFileUploadRequest(workflowId, options);
// 3. Create file on server
FileUploadResponse response = createFileOnServer(request);
// 4. Fail-fast: verify a backend is registered for the server's storage type
FileStorageBackend storageBackend = fileStorageBackendsByStorageType.get(response.getStorageType());
if (storageBackend == null) {
throw new FileStorageException("Server uses " + response.getStorageType()
+ " but SDK only supports: " + fileStorageBackendsByStorageType.keySet());
}
// 5. Decide single-part vs multipart
if (options.getFileSize() > properties.getMultipartThreshold() && storageBackend.hasMultipartSupport()) {
uploadMultipart(response.getFileHandleId(), response.getStorageType(), localFile);
} else {
storageBackend.upload(response.getUploadUrl(), localFile);
confirmUpload(response.getFileHandleId());
}
// 6. Build ManagedFileHandler scoped to this workflowId for future downloads
return FileHandlerConverter.toManagedFileHandler(
response, new WorkflowFileClient(this, workflowId), localFile);
}
Multipart Upload — FileClient
private void uploadMultipart(String fileHandleId, Path localFile, FileStorageBackend backend) {
// 1. Initiate
MultipartInitResponse init = initiateMultipartUpload(fileHandleId);
String uploadId = init.getUploadId();
long partSize = init.getPartSize();
long fileSize = Files.size(localFile);
int totalParts = (int) Math.ceil((double) fileSize / partSize);
List<String> partETags = new ArrayList<>();
// 2. Upload parts
for (int part = 1; part <= totalParts; part++) {
long offset = (part - 1) * partSize;
long length = Math.min(partSize, fileSize - offset);
String url;
if (init.getUploadUrl() != null) {
// GCS/Azure — reuse resumable URL
url = init.getUploadUrl();
} else {
// S3 — per-part presigned URL
url = getPartUploadUrl(fileHandleId, uploadId, part);
}
String etag = backend.uploadPart(url, localFile, offset, length);
partETags.add(etag);
}
// 3. Complete
completeMultipartUpload(fileHandleId, uploadId, partETags);
}
Annotation Integration — AnnotatedWorker Changes
Input: getInputValue()
When parameter type is FileHandler and inputData value is a fileHandleId (starts with conductor://file/):
// In AnnotatedWorker.getInputValue() — add before ObjectMapper deserialization:
if (parameterType == FileHandler.class) {
Object value = task.getInputData().get(paramName);
if (FileIdToFileHandleIdConverter.isFileHandleId(value)) {
return new ManagedFileHandler((String) value, task.getFileClient());
}
throw new FileStorageException("Expected conductor://file/ reference for param '" + paramName
+ "', got: " + value);
}
Output: setValue()
When return value is FileHandler, the impl just stores the handler in output and lets TaskRunner
perform the upload-and-replace (see next section):
if (invocationResult instanceof FileHandler fh) {
OutputParam opAnn = workerMethod.getAnnotatedReturnType().getAnnotation(OutputParam.class);
String key = opAnn != null ? opAnn.value() : "result";
result.getOutputData().put(key, fh);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}
Automatic Upload — TaskRunner Changes
TaskRunner.uploadFilesToFileStorage() scans outputData after the worker returns; for any
LocalFileHandler (fileHandleId == null) it uploads via FileClient.upload(...) and replaces the
entry with the uploaded FileHandler. Already-uploaded handlers are left untouched. Both then
serialize to the {fileHandleId, contentType, fileName} JSON object via FileHandlerSerializer.
@VisibleForTesting
void uploadFilesToFileStorage(TaskResult result, String workflowId, String taskId) {
if (fileClient == null || result.getOutputData() == null) {
return;
}
for (var entry : result.getOutputData().entrySet()) {
if (!(entry.getValue() instanceof FileHandler fh)) {
continue;
}
if (fh.getFileHandleId() == null) {
Path path = ((LocalFileHandler) fh).getPath();
FileUploadOptions options = new FileUploadOptions()
.setContentType(fh.getContentType())
.setTaskId(taskId);
FileHandler uploaded = fileClient.upload(workflowId, path, options);
entry.setValue(uploaded);
}
// Already-uploaded handlers stay in place — Jackson + FileHandlerSerializer renders
// the {fileHandleId, contentType, fileName} object on the wire.
}
}
Changes across 4 files (~30 lines total):
- TaskRunner — scan loop + FileClient field (insert alongside existing external payload upload logic in updateTaskResult())
- TaskRunnerConfigurer — accept + pass FileClient
- TaskRunnerConfigurer.Builder — withFileClient(FileClient)
- Task — @JsonIgnore private transient FileClient fileClient; + getter/setter (Task uses Lombok @Data/@Builder — transient field excluded from builder/serialization)
OrkesClients Integration
Follows existing pattern — OrkesClients gets a factory method, OrkesFileClient extends FileClient. The
factory method does not require a FileStorageBackend parameter; FileClient registers default backends
internally.
// In OrkesClients — add factory method (follows getTaskClient(), getWorkflowClient() pattern):
public FileClient getFileClient() {
return new FileClient(client);
}
// OrkesFileClient — follows OrkesTaskClient extends TaskClient pattern:
package io.orkes.conductor.client.http;
public class OrkesFileClient extends FileClient {
public OrkesFileClient(ConductorClient client) {
super(client);
}
}
Spring Auto-Configuration
Wired in ConductorClientAutoConfiguration (in the conductor-client-spring module). The impl does not
register individual backend beans — FileClient constructs default backends (LOCAL/S3/AZURE_BLOB/GCS)
internally.
@Bean
@ConditionalOnMissingBean
public FileClientProperties fileClientProperties() {
return new FileClientProperties();
}
@Bean
@ConditionalOnBean(ConductorClient.class)
@ConditionalOnMissingBean
public FileClient fileClient(ConductorClient client, FileClientProperties properties) {
return new FileClient(client, properties, null); // null → default backends
}
Testing & Verification
Stubs
StubFileStorage
In-memory FileStorage for server unit tests.
public class StubFileStorage implements FileStorage {
private final Map<String, byte[]> files = new ConcurrentHashMap<>();
@Override public StorageType getStorageType() { return StorageType.LOCAL; }
@Override public String generateUploadUrl(String path, Duration exp) { return "stub://" + path; }
@Override public String generateDownloadUrl(String path, Duration exp) { return "stub://" + path; }
@Override public StorageFileInfo getStorageFileInfo(String path) {
if (!files.containsKey(path)) return null;
byte[] data = files.get(path);
return new StorageFileInfo(true, null, data.length);
}
// multipart methods — simple no-op implementations
}
StubFileMetadataDAO
In-memory DAO for server unit tests.
public class StubFileMetadataDAO implements FileMetadataDAO {
private final Map<String, FileModel> store = new ConcurrentHashMap<>();
@Override public void createFileMetadata(FileModel m) { store.put(m.getFileId(), m); }
@Override public FileModel getFileMetadata(String id) { return store.get(id); }
// ... remaining methods backed by store
}
StubFileStorageBackend
In-memory FileStorageBackend for SDK unit tests.
public class StubFileStorageBackend implements FileStorageBackend {
private final Map<String, byte[]> uploads = new ConcurrentHashMap<>();
@Override public StorageType getStorageType() { return StorageType.LOCAL; }
@Override public void upload(String url, Path file) { uploads.put(url, Files.readAllBytes(file)); }
@Override public void download(String url, Path dest) { Files.write(dest, uploads.get(url)); }
@Override public String uploadPart(...) { /* store part bytes, return mock ETag */ }
}
Test Placement
| Test Type | Server Location | SDK Location |
|---|---|---|
| Unit — service, DAO, backends | Per-module src/test/ |
conductor-client/src/test/ |
| Integration — full flows | test-harness/src/test/groovy/ |
tests/src/test/ |
| E2E — server + worker + MinIO | e2e/src/test/java/ |
— |
E2E Verification — Kubernetes + MinIO
Kubernetes is the primary E2E verification mechanism. Implementation is not complete until it passes the E2E tests. Full guide: e2e-guide.md.
Architecture
Kubernetes namespace: conductor-file-storage
┌──────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────────┐ ┌──────────┐ ┌───────┐ │
│ │ Conductor Server │──▶│ Postgres │ │ MinIO │ │
│ │ (file-storage=s3)│ │ │ │ (S3) │ │
│ └────────┬────────┘ └──────────┘ └───┬───┘ │
│ │ presigned URLs │ │
│ │◀──────────────────────────────▶│ │
│ │ │
└───────────┼──────────────────────────────────────────────────────┘
│ port-forward :8000→8080
┌───────────┴────────┐
│ E2E Tests │
│ (Gradle/JUnit) │
└────────────────────┘
K8s Manifests
All in deploy/k8s/:
| File | What |
|---|---|
namespace.yaml |
conductor-file-storage namespace |
postgres.yaml |
Postgres 16 deployment + service |
minio.yaml |
MinIO deployment + service (ports 9000, 9001) |
minio-bucket-init.yaml |
Job: creates conductor-files bucket via minio/mc |
conductor-config.yaml |
ConfigMap with server properties |
conductor-server.yaml |
Server deployment + service, init containers wait for deps |
kustomization.yaml |
Kustomize entrypoint |
Server Configuration
ConfigMap: deploy/k8s/conductor-config.yaml
conductor.db.type=postgres
conductor.file-storage.enabled=true
conductor.file-storage.type=s3
conductor.file-storage.max-file-size=5GB
conductor.file-storage.signed-url-expiration=60s
conductor.file-storage.s3.bucket-name=conductor-files
conductor.file-storage.s3.region=us-east-1
aws.s3.endpoint=http://minio:9000
conductor.indexing.enabled=false
Ports
| Service | K8s Port | Port-Forward | Purpose |
|---|---|---|---|
| conductor-server | 8080 | 8000 | REST API |
| minio (S3) | 9000 | 9000 | S3-compatible endpoint |
| minio (console) | 9001 | 9001 | Web UI |
| postgres | 5432 | — | Database (cluster-internal) |
Running E2E Tests
One command:
Manual:
# 1. Build server image
docker build -t conductor:server -f docker/server/Dockerfile .
# 2. Deploy to K8s
kubectl apply -k deploy/k8s/
# 3. Wait for readiness
kubectl -n conductor-file-storage wait --for=condition=ready pod -l app=conductor-server --timeout=120s
# 4. Port-forward
kubectl -n conductor-file-storage port-forward svc/conductor-server 8000:8080 &
# 5. Run tests
RUN_E2E=true ./gradlew :conductor-e2e:test \
--tests "io.conductor.e2e.filestorage.*" \
-DSERVER_ROOT_URI="http://localhost:8000" --no-daemon
# 6. Teardown
kubectl delete namespace conductor-file-storage
E2E Test Cases
e2e/src/test/java/io/conductor/e2e/filestorage/FileStorageE2ETest.java:
| Test | Validates |
|---|---|
testCreateFileReturnsFileId |
POST /api/files → 201, conductor://file/ |
testCreateFileRequiresWorkflowId |
POST /api/files without workflowId → 400 |
testGetUploadUrlReturnsFreshUrl |
GET /upload-url → fresh presigned URL |
testGetFileMetadata |
GET /api/files/{id} → correct fields |
testFileNotFoundReturns404 |
GET unknown file → 404 |
testFileTooLargeRejected |
POST with 6GB → 413 |
testInitiateMultipartUpload |
POST /multipart → uploadId + partSize |
testDownloadUrlRequiresUploadedStatus |
GET /{workflowId}/{fileId}/download-url before upload → 400 |
testDownloadUrlSameWorkflow |
workflow owner can download its own file → 200 |
testDownloadUrlChildWorkflow |
child workflow can download parent's file → 200 |
testDownloadUrlUnrelatedWorkflow |
unrelated workflow → 403 |
testDownloadUrlDefaultWorkflowId |
defaultWorkflowId bypasses family check → 200 |
testStorageTypeInMetadata |
Metadata includes storageType field |
Uses Map<String, Object> responses for compatibility with published SDK jar (DTOs not yet
in the SDK dependency).
Verification Order
- Java SDK — verify and fix first
- Other SDKs one at a time after Java SDK is stable
Developer Experience
DX walkthrough based on the canonical Media Transcoder example. The full implementation lives there; this section explains the design choices.
Scenario: Media Transcode
Upload primary video → FORK into transcode (720p) + extract thumbnail → JOIN → create manifest.
The primary video upload happens inside the workflow as the first task — not in the driver before
startWorkflow. The driver only registers the workflow def and starts an empty execution. All four worker
tasks live on the same JVM; the example mixes the Worker interface and the @WorkerTask annotated bean
pattern to show both.
graph LR
Start["Start workflow<br/><i>(no input)</i>"] --> Upload["upload_primary_video<br/><small>Worker — explicit upload</small>"]
Upload --> Fork["FORK"]
Fork --> Transcode["transcode_video<br/><small>@WorkerTask — POJO input,<br/>auto-upload return</small>"]
Fork --> Thumbnail["extract_thumbnail<br/><small>Worker — explicit upload</small>"]
Transcode --> Join["JOIN"]
Thumbnail --> Join
Join --> Manifest["create_manifest<br/><small>Worker — multi-file in,<br/>explicit upload</small>"]
Manifest --> End["End"]
style Upload fill:#e1f5fe
style Transcode fill:#fff3e0
style Thumbnail fill:#fff3e0
style Manifest fill:#f3e5f5
sequenceDiagram
participant Dev as Driver
participant WF as Workflow
participant T0 as upload_primary_video
participant T1 as transcode_video
participant T2 as extract_thumbnail
participant T3 as create_manifest
Dev->>WF: startWorkflow("media_transcode", {})
WF->>T0: {}
T0->>T0: open classpath /data/primary_video.mov
T0->>WF: task.getFileUploader().upload(in, options)
T0-->>WF: outputData[primary_video] = FileHandler → serializes as {fileHandleId, contentType, fileName}
par FORK
WF->>T1: inputData[primary_video] = {fileHandleId, contentType, fileName}
Note over T1: SDK binds input to TranscodeInput POJO<br/>(FileHandler primary_video field)
T1->>T1: getInputStream() → write transcoded.mp4
T1-->>WF: return FileHandler.fromLocalFile(...)
Note over T1,WF: TaskRunner uploads, leaves the FileHandler in place — Jackson<br/>renders {fileHandleId, contentType, fileName} on the wire
and
WF->>T2: {primary_video: "conductor://file/primary"}
T2->>T2: getInputStream() → write thumb.png
T2->>WF: task.getFileUploader().upload(...)
T2-->>WF: {output_file: "conductor://file/thumb"}
end
WF->>T3: {transcoded_video: "...", thumbnail: "..."}
T3->>T3: download both, describe(), build manifest.json
T3->>WF: task.getFileUploader().upload(...)
T3-->>WF: {output_file: "conductor://file/manifest"}
| Task | Pattern | Upload mode |
|---|---|---|
upload_primary_video |
Worker interface |
Explicit task.getFileUploader().upload(InputStream, options) of a classpath resource |
transcode_video |
@WorkerTask annotated bean, POJO input |
Returns FileHandler.fromLocalFile(...) — TaskRunner auto-uploads |
extract_thumbnail |
Worker interface |
Explicit upload; output is the fileHandleId string |
create_manifest |
Worker interface |
Reads both inputs, prints a describe(...) of each handle (incl. small content preview), explicit upload |
Exercises: workflow with no inputs, FileHandler as a POJO field input (annotated path), explicit and automatic upload in the same workflow, FORK/JOIN, multi-file consumption.
Workflow Definition
{
"name": "media_transcode",
"version": 1,
"tasks": [
{
"name": "upload_primary_video",
"taskReferenceName": "upload_primary_video_ref",
"type": "SIMPLE",
"inputParameters": {}
},
{
"name": "fork_processing",
"taskReferenceName": "fork_ref",
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "transcode_video",
"taskReferenceName": "transcode_ref",
"type": "SIMPLE",
"inputParameters": {
"primary_video": "${upload_primary_video_ref.output.primary_video}",
"resolution": "720p"
}
}
],
[
{
"name": "extract_thumbnail",
"taskReferenceName": "thumbnail_ref",
"type": "SIMPLE",
"inputParameters": {
"primary_video": "${upload_primary_video_ref.output.primary_video}"
}
}
]
]
},
{
"name": "join_processing",
"taskReferenceName": "join_ref",
"type": "JOIN",
"joinOn": ["transcode_ref", "thumbnail_ref"]
},
{
"name": "create_manifest",
"taskReferenceName": "manifest_ref",
"type": "SIMPLE",
"inputParameters": {
"transcoded_video": "${transcode_ref.output.output_file}",
"thumbnail": "${thumbnail_ref.output.output_file}"
}
}
],
"inputParameters": [],
"outputParameters": {
"primary_video": "${upload_primary_video_ref.output.primary_video}",
"manifest": "${manifest_ref.output.output_file}",
"transcoded_video": "${transcode_ref.output.output_file}",
"thumbnail": "${thumbnail_ref.output.output_file}"
}
}
All cross-task keys are snake_case. The nested FileHandler JSON shape (fileHandleId, contentType,
fileName) is camelCase — that is a fixed SDK contract and not per-example configurable.
Execution Input
Execution Output
FileHandler values in task outputData are serialized via FileHandlerSerializer to the fixed
three-field JSON object {fileHandleId, contentType, fileName}. Workers that put a raw fileHandleId
string in outputData directly (e.g., via task.getFileUploader().upload(...).getFileHandleId()) emit
the string form. Task#getInputFileHandler and @InputParam FileHandler accept either form.
In the canonical example, upload_primary_video and transcode_video emit the object form
(they put FileHandler instances in outputData / return one); extract_thumbnail and create_manifest
emit strings. The workflow output collects whatever shape the contributing task produced.
{
"primary_video": {
"fileHandleId": "conductor://file/a1b2c3d4-0000-0000-0000-primary00001",
"contentType": "video/quicktime",
"fileName": "primary_video.mov"
},
"transcoded_video": {
"fileHandleId": "conductor://file/b2c3d4e5-0000-0000-0000-720p00000001",
"contentType": "video/mp4",
"fileName": "transcoded-720p-xxx.mp4"
},
"thumbnail": "conductor://file/c3d4e5f6-0000-0000-0000-thumb0000001",
"manifest": "conductor://file/d4e5f6a7-0000-0000-0000-manifest0001"
}
Task-Level Input/Output
TaskRunner.uploadFilesToFileStorage only auto-uploads LocalFileHandler entries (those with a null
fileHandleId) and replaces them with the resulting FileHandler — it does not flatten to a string.
Already-uploaded handlers are left in place. Both then serialize via FileHandlerSerializer.
upload_primary_video_ref:
// inputData
{}
// outputData
{
"primary_video": {
"fileHandleId": "conductor://file/a1b2c3d4-0000-0000-0000-primary00001",
"contentType": "video/quicktime",
"fileName": "primary_video.mov"
}
}
transcode_ref:
// inputData
{
"primary_video": {
"fileHandleId": "conductor://file/a1b2c3d4-0000-0000-0000-primary00001",
"contentType": "video/quicktime",
"fileName": "primary_video.mov"
},
"resolution": "720p"
}
// outputData
{
"output_file": {
"fileHandleId": "conductor://file/b2c3d4e5-0000-0000-0000-720p00000001",
"contentType": "video/mp4",
"fileName": "transcoded-720p-xxx.mp4"
}
}
thumbnail_ref:
// inputData
{
"primary_video": {
"fileHandleId": "conductor://file/a1b2c3d4-0000-0000-0000-primary00001",
"contentType": "video/quicktime",
"fileName": "primary_video.mov"
}
}
// outputData
// String form — worker put `uploaded.getFileHandleId()` directly
{ "output_file": "conductor://file/c3d4e5f6-0000-0000-0000-thumb0000001" }
manifest_ref:
// inputData
// String form because thumbnail_ref / transcode_ref produced strings, OR
// object form if upstream produced FileHandler — `getInputFileHandler` handles both.
{
"transcoded_video": { "fileHandleId": "conductor://file/b2c3d4e5-0000-0000-0000-720p00000001", "contentType": "video/mp4", "fileName": "transcoded-720p-xxx.mp4" },
"thumbnail": "conductor://file/c3d4e5f6-0000-0000-0000-thumb0000001"
}
// outputData
{ "output_file": "conductor://file/d4e5f6a7-0000-0000-0000-manifest0001" }
Workers
upload_primary_video — Worker, explicit upload from classpath
public class UploadPrimaryVideoWorker implements Worker {
private static final String RESOURCE_PATH = "/data/primary_video.mov";
@Override public String getTaskDefName() { return "upload_primary_video"; }
@Override
public TaskResult execute(Task task) {
TaskResult result = new TaskResult(task);
try (InputStream in = UploadPrimaryVideoWorker.class.getResourceAsStream(RESOURCE_PATH)) {
FileUploadOptions options = new FileUploadOptions()
.setFileName("primary_video.mov")
.setContentType("video/quicktime");
FileHandler uploaded = task.getFileUploader().upload(in, options);
result.getOutputData().put("primary_video", uploaded);
result.setStatus(TaskResult.Status.COMPLETED);
} catch (Exception e) {
result.setStatus(TaskResult.Status.FAILED);
result.setReasonForIncompletion(e.getMessage());
}
return result;
}
}
The output value is a fully-uploaded FileHandler. TaskRunner.uploadFilesToFileStorage skips re-upload
(the handle id is already non-null) and just swaps the value to the string.
transcode_video — Annotated bean, POJO input, auto-upload return
Demonstrates FileHandler as a POJO field — the SDK's FileHandlerDeserializer resolves the inbound
string (or JSON object) into a ManagedFileHandler automatically.
public class TranscodeWorker {
public static class TranscodeInput {
public FileHandler primary_video;
public String resolution;
}
@WorkerTask("transcode_video")
public @OutputParam("output_file") FileHandler transcode(TranscodeInput input) throws IOException {
Path transcoded = Files.createTempFile("transcoded-" + input.resolution + "-", ".mp4");
try (InputStream in = input.primary_video.getInputStream()) {
Files.write(transcoded, ("TRANSCODED:" + input.resolution + "\n").getBytes());
Files.write(transcoded, in.readAllBytes(), StandardOpenOption.APPEND);
}
return FileHandler.fromLocalFile(transcoded, "video/mp4");
}
}
POJO field names are snake_case to match workflow JSON keys 1:1 — no @JsonProperty needed. Returning a
LocalFileHandler triggers the TaskRunner auto-upload path: it uploads, then writes the resulting
fileHandleId into outputData under output_file.
extract_thumbnail — Worker, explicit upload, string output
public class ThumbnailWorker implements Worker {
@Override public String getTaskDefName() { return "extract_thumbnail"; }
@Override
public TaskResult execute(Task task) {
TaskResult result = new TaskResult(task);
FileHandler primary = task.getInputFileHandler("primary_video");
try (InputStream in = primary.getInputStream()) {
Path thumbnail = Files.createTempFile("thumb-", ".png");
Files.write(thumbnail, "PNG_THUMBNAIL_DATA".getBytes());
FileUploadOptions options = new FileUploadOptions().setContentType("image/png").setTaskId(task.getTaskId());
FileHandler uploaded = task.getFileUploader().upload(thumbnail, options);
result.getOutputData().put("output_file", uploaded.getFileHandleId());
result.setStatus(TaskResult.Status.COMPLETED);
} catch (Exception e) {
result.setStatus(TaskResult.Status.FAILED);
result.setReasonForIncompletion(e.getMessage());
}
return result;
}
}
create_manifest — Worker, multi-file in, with describe() helper
public class ManifestWorker implements Worker {
@Override public String getTaskDefName() { return "create_manifest"; }
@Override
public TaskResult execute(Task task) {
TaskResult result = new TaskResult(task);
FileHandler video = task.getInputFileHandler("transcoded_video");
FileHandler thumb = task.getInputFileHandler("thumbnail");
try {
System.out.println("[create_manifest] Inputs:");
describe(video);
describe(thumb);
String manifest = buildManifest(
video.getFileHandleId(), video.getFileName(), video.getFileSize(),
thumb.getFileHandleId(), thumb.getFileName(), thumb.getFileSize());
Path manifestFile = Files.createTempFile("manifest-", ".json");
Files.writeString(manifestFile, manifest);
FileUploadOptions options = new FileUploadOptions().setContentType("application/json").setTaskId(task.getTaskId());
FileHandler uploaded = task.getFileUploader().upload(manifestFile, options);
result.getOutputData().put("output_file", uploaded.getFileHandleId());
result.setStatus(TaskResult.Status.COMPLETED);
} catch (Exception e) {
result.setStatus(TaskResult.Status.FAILED);
result.setReasonForIncompletion(e.getMessage());
}
return result;
}
private static void describe(FileHandler fh) throws IOException {
String content = new String(fh.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
System.out.println("=================");
System.out.printf("fileHandleId=%s fileName=%s contentType=%s size=%d bytes%n content=%s%n",
fh.getFileHandleId(), fh.getFileName(), fh.getContentType(), fh.getFileSize(), content);
System.out.println("=================");
}
}
describe() shows that the developer can read each handle's metadata + bytes through the same SDK
abstraction — the underlying download is lazy and cached on first getInputStream() call.
Bootstrapping
The driver wires three Worker instances and one AnnotatedWorker (wrapping the bean's annotated method)
into the same TaskRunnerConfigurer. FileClient is constructed only to wire into the configurer —
the driver never calls fileClient.upload(...) itself.
public class MediaTranscoderApp {
public static void main(String[] args) throws Exception {
ConductorClient client = ConductorClient.builder()
.basePath(System.getenv().getOrDefault("CONDUCTOR_SERVER_URL", "http://localhost:8080/api"))
.build();
TaskClient taskClient = new TaskClient(client);
WorkflowClient workflowClient = new WorkflowClient(client);
MetadataClient metadataClient = new MetadataClient(client);
FileClient fileClient = new FileClient(client);
WorkflowDef workflowDef = registerWorkflowDef(metadataClient);
// Wrap the annotated bean's method as a Worker.
TranscodeWorker transcodeBean = new TranscodeWorker();
AnnotatedWorker transcodeWorker = new AnnotatedWorker(
"transcode_video",
TranscodeWorker.class.getMethod("transcode", TranscodeWorker.TranscodeInput.class),
transcodeBean);
List<Worker> workers = List.of(
new UploadPrimaryVideoWorker(),
transcodeWorker,
new ThumbnailWorker(),
new ManifestWorker());
new TaskRunnerConfigurer.Builder(taskClient, workers)
.withFileClient(fileClient) // required for task.getFileUploader() and auto-upload
.withThreadCount(4)
.build()
.init();
StartWorkflowRequest request = new StartWorkflowRequest();
request.setName(workflowDef.getName());
request.setVersion(workflowDef.getVersion());
request.setInput(Map.of());
String workflowId = workflowClient.startWorkflow(request);
// poll for completion …
}
}
FileHandler Interface — What the Developer Sees
FileHandler file = ...;
file.getFileHandleId(); // "conductor://file/d6a4e5f7-..." (null before upload)
file.getFileName(); // "primary_video.mov"
file.getContentType(); // "video/quicktime"
file.getFileSize(); // 104857600
// Lazy download — transparent to developer
try (InputStream in = file.getInputStream()) {
// read file content
}
// Second call — no re-download, returns new InputStream from cache
try (InputStream in2 = file.getInputStream()) {
// read again, instant
}
FileUploader Interface — What the Developer Sees
FileUploader uploader = ...; // WorkflowFileClient behind the interface
// accessed inside a worker as task.getFileUploader()
// From local file
FileHandler h1 = uploader.upload(Path.of("/data/output.mp4"));
FileHandler h2 = uploader.upload(Path.of("/data/thumb.png"), new FileUploadOptions().setContentType("image/png"));
// From InputStream
FileHandler h3 = uploader.upload(inputStream);
FileHandler h4 = uploader.upload(inputStream, new FileUploadOptions().setContentType("application/json"));
// Use the file ID in workflow input or task output
String handleId = h1.getFileHandleId(); // "conductor://file/..."