Architecture¶
Rustvello is organized as a Rust workspace of 16 crates with focused responsibilities. This page describes how they fit together, the data model, core trait signatures, the invocation state machine, and the cross-language design.
Used from Pynenc¶
Rustvello serves as the execution engine for the Python distributed task orchestration framework Pynenc. While this page documents Rustvello’s internal Rust design, Python users interact with these concepts via Pynenc. For the Python perspective, see the Pynenc Architecture Docs.
Crate Dependency Graph¶
rustvello-proto Pure data types — DTOs, identifiers, config, status FSM
│
rustvello-core Trait definitions + business logic managers
│ Broker, Orchestrator, StateBackend, TriggerStore,
│ ClientDataStore, Task, DynTask, InvocationHandle,
│ TriggerManager, Context system
│
├── rustvello-mem In-memory implementations (dev/testing)
├── rustvello-sqlite SQLite implementations (single-host production)
├── rustvello-redis Redis implementations (distributed)
├── rustvello-postgres PostgreSQL trigger store
├── rustvello-mongo MongoDB implementations (driver v3)
├── rustvello-mongo3 MongoDB implementations (driver v2 — legacy)
├── rustvello-rabbitmq RabbitMQ broker
│
rustvello-macros #[rustvello::task] proc-macro — zero-cost compile-time
│ task registration via inventory crate
│
rustvello Application layer — RustvelloApp, RustvelloBuilder,
│ TaskRunner, TriggerBuilder, TaskRegistry,
│ auto-discover via inventory::collect!
│
├── rustvello-monitoring Web dashboard (Axum + Askama + HTMX + SVG)
├── rustvello-prometheus Prometheus EventEmitter implementation
├── rustvello-test-suite Macro-generated backend compliance tests
│
├── rustvello-python PyO3 #[pyclass] wrappers (Rust → Python bridge)
│
└── rustvello-cli CLI binary (rustvello run / status / list / purge / info / config)
py-rustvello/ (Python wheel — cdylib built with maturin)
└── rustvello Python package — exposes PyO3-wrapped Rust backends
pynenc-rustvello/ (separate package — pip install pynenc-rustvello)
└── pynenc_rustvello/ Stateless adapters: subclass pynenc ABCs, delegate to py-rustvello
Python Integration Architecture¶
Rustvello powers Python applications through a three-layer architecture. Each layer has a single responsibility:
┌─────────────────────────────────────────────┐
│ User Python Code │
│ app = PynencBuilder().rustvello_redis() │
│ .build() │
└──────────────────┬──────────────────────────┘
│ pynenc ABC interface
┌──────────────────▼──────────────────────────┐
│ pynenc-rustvello │
│ Python adapters (broker.py, orchestrator. │
│ py, state_backend.py, trigger.py, ...) │
│ Stateless bridges: type conversion only │
└──────────────────┬──────────────────────────┘
│ PyO3 bindings
┌──────────────────▼──────────────────────────┐
│ py-rustvello (rustvello wheel) │
│ PyO3-exposed Rust structs: │
│ RustMemBroker, RustRedisBroker, ... │
└──────────────────┬──────────────────────────┘
│ Rust trait calls
┌──────────────────▼──────────────────────────┐
│ rustvello-core / rustvello-mem / │
│ rustvello-redis / ... │
│ Pure Rust implementations │
└─────────────────────────────────────────────┘
Layer |
Package |
Knows about pynenc? |
Contains logic? |
|---|---|---|---|
Rust core |
|
No |
All logic lives here |
PyO3 bindings |
|
No |
Type conversion only |
Python adapters |
Yes — satisfies pynenc ABCs |
No — stateless bridge |
|
Framework |
|
No rustvello knowledge |
Plugin discovery only |
Data Model (rustvello-proto)¶
Identifiers¶
Type |
Structure |
Purpose |
|---|---|---|
|
|
Uniquely identifies a task definition; language defaults to |
|
|
Deterministic identity for task + args (SHA-256 of serialized args) |
|
newtype |
Unique execution instance |
|
newtype |
Identifies a runner process |
|
newtype |
Identifies a trigger condition |
|
newtype |
Identifies a trigger definition |
Invocation Status — Finite State Machine¶
graph TD
Start(( )) -->|init| Registered
Registered -->|schedule| Pending
Registered -->|concurrency check| CC[ConcurrencyControlled]
Registered -->|limit reached| CCFinal[ConcurrencyControlledFinal]
CC -->|re-queue| Rerouted
Rerouted -->|schedule| Pending
Rerouted -->|still controlled| CC
Pending -->|run| Running
Pending -->|crash / OOM| Killed
Pending -->|cycle control| Failed
Pending -->|re-route| Rerouted
Pending -->|timeout| PR[PendingRecovery]
PR -->|re-queue| Rerouted
Running -->|complete| Success
Running -->|error| Failed
Running -->|crash / OOM| Killed
Running -->|retry| Retry
Running -->|suspend| Paused
Running -->|timeout| RR[RunningRecovery]
RR -->|re-queue| Rerouted
Paused -->|continue| Resumed
Paused -->|crash / OOM| Killed
Resumed -->|complete| Success
Resumed -->|error| Failed
Resumed -->|crash / OOM| Killed
Resumed -->|retry| Retry
Resumed -->|suspend| Paused
Killed -->|re-queue| Rerouted
Retry -->|schedule| Pending
Success --> End(( ))
Failed --> End
CCFinal --> End
classDef available fill:#22863a,color:#fff,stroke:#1a6e2e,stroke-width:2px
classDef execution fill:#6f42c1,color:#fff,stroke:#5a32a3,stroke-width:2px
classDef recovery fill:#e36209,color:#fff,stroke:#c55404,stroke-width:2px
classDef queue fill:#0366d6,color:#fff,stroke:#025ab5,stroke-width:2px
classDef termFail fill:#cb2431,color:#fff,stroke:#a91d28,stroke-width:2px
classDef termSuccess fill:#28a745,color:#fff,stroke:#1e7e34,stroke-width:2px
classDef point fill:#586069,color:#fff,stroke:#586069
class Registered,Rerouted,Retry available
class Running,Paused,Resumed execution
class PR,RR,Killed recovery
class Pending,CC queue
class Failed,CCFinal termFail
class Success termSuccess
class Start,End point
Colour legend — 🟢 Green: available for run (Registered, Rerouted, Retry) · 🟣 Purple: execution (Running, Paused, Resumed) · 🟠 Orange: recovery / kill (PendingRecovery, RunningRecovery, Killed) · 🔵 Blue: queued (Pending, ConcurrencyControlled) · 🔴 Red: terminal failure (Failed, ConcurrencyControlledFinal) · ✅ Green: terminal success (Success)
14 states. Terminal states: Success, Failed, ConcurrencyControlledFinal.
Killed and Rerouted are not terminal — they re-enter the lifecycle via Rerouted → Pending.
Transitions are validated by InvocationStatus::valid_transitions() at runtime.
Mermaid source: _static/invocation-status-fsm.mmd
Configuration¶
Struct |
Key Fields |
|---|---|
|
|
|
|
|
|
Core Traits (rustvello-core)¶
All core traits are Send + Sync and use #[async_trait] for object safety with
Arc<dyn Trait> dispatch.
Broker¶
Routes invocations into queues and retrieves the next one for a worker.
pub trait Broker: Send + Sync {
async fn route_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
async fn retrieve_invocation(&self, task_id: Option<&TaskId>) -> RustvelloResult<Option<InvocationId>>;
async fn retrieve_invocation_for_language(
&self, language: &str, task_id: Option<&TaskId>
) -> RustvelloResult<Option<InvocationId>>;
async fn purge(&self) -> RustvelloResult<()>;
}
Orchestrator¶
Manages invocation lifecycle, concurrency control, heartbeats, and recovery.
pub trait Orchestrator: Send + Sync {
// Lifecycle
async fn register_invocation(&self, call: &CallDTO, id: &InvocationId) -> RustvelloResult<()>;
async fn set_invocation_status(
&self, id: &InvocationId, status: InvocationStatus, runner_id: Option<&RunnerId>
) -> RustvelloResult<()>;
async fn get_invocation_status(&self, id: &InvocationId) -> RustvelloResult<InvocationStatusRecord>;
// Concurrency control
async fn check_concurrency_control(
&self, task_id: &TaskId, config: &TaskConfig, cc_args: Option<&SerializedArguments>
) -> RustvelloResult<bool>;
// Recovery
async fn register_heartbeat(&self, runner_id: &RunnerId) -> RustvelloResult<()>;
async fn get_stale_pending_invocations(&self, max_pending_seconds: f64) -> RustvelloResult<Vec<InvocationId>>;
async fn get_stale_running_invocations(&self, runner_dead_after_seconds: f64) -> RustvelloResult<Vec<InvocationId>>;
}
StateBackend¶
Stores and retrieves task results and errors.
pub trait StateBackend: Send + Sync {
async fn set_result(&self, id: &InvocationId, result: &str) -> RustvelloResult<()>;
async fn get_result(&self, id: &InvocationId) -> RustvelloResult<Option<String>>;
async fn set_error(&self, id: &InvocationId, error: &str) -> RustvelloResult<()>;
async fn get_error(&self, id: &InvocationId) -> RustvelloResult<Option<String>>;
}
Application Layer (rustvello)¶
RustvelloApp¶
The central application object. Owns all subsystems and coordinates task registration,
invocation, and execution. Analogous to pynenc’s Pynenc class.
let app = Rustvello::builder()
.app_id("my-app")
.from_env() // RUSTVELLO__* env vars
.from_file("config.toml")
.auto_discover_tasks() // collects all #[rustvello::task] via inventory
.build().await?;
#[rustvello::task] Macro¶
The #[rustvello::task] attribute macro transforms a plain function into a typed,
serializable task with compile-time registration:
#[rustvello::task(max_retries = 3, concurrency = "arguments")]
fn process(data: String) -> String {
data.to_uppercase()
}
// Generates: ProcessParams { data: String }, ProcessTask (impl Task)
Supported attributes:
Attribute |
Type |
Description |
|---|---|---|
|
|
Retry attempts on failure |
|
|
Override the module component of |
|
|
|
|
|
Registration-time dedup mode |
|
|
Argument names used as concurrency key |
|
|
Cache results for identical args |
|
|
Always start a new workflow scope |
|
|
Reroute when hitting concurrency limits |
|
|
Run on a blocking thread (for CPU-bound work) |
TaskRunner¶
The TaskRunner spawns N async workers (configurable via num_workers) that
race to claim invocations from the broker and execute them concurrently. A separate
management loop handles heartbeats, recovery checks, and trigger evaluations.
TaskRunner (runner_id = "uuid")
├── management_loop() — heartbeats, recovery, trigger evaluation
├── worker_loop(0) — polls broker, executes tasks
├── worker_loop(1)
└── worker_loop(N-1)
Cross-Language Architecture¶
Rustvello supports multi-language deployments where Python (pynenc)
and Rust workers share the same broker and orchestrator under one app_id.
Language-Qualified TaskId¶
Each TaskId carries a language field:
rs::my_crate.add ← Rust task (handled by rustvello workers)
py::my_module.add ← Python task (handled by pynenc workers)
Routing¶
The broker maintains per-language queues. Each worker fetches only from its own
language queue via retrieve_invocation_for_language(). This keeps Rust workers
and Python workers isolated while sharing all other state.
Wire Format¶
Cross-language calls use a canonical BTreeMap<String, String> format (JSON-encoded
values, deterministic key order) that both languages produce and consume identically.
The SHA-256 args_id algorithm is identical in both runtimes.
Crate Descriptions¶
rustvello-mem¶
In-memory implementations using tokio::sync::Mutex<HashMap> and VecDeque.
Suitable for development, testing, and single-process use. Zero external dependencies.
rustvello-sqlite¶
SQLite-backed implementations via rusqlite. Suitable for single-host deployments
where data must outlive the process. Compiles libsqlite3 statically — no system
library required.
rustvello-redis¶
Redis-backed broker, orchestrator, and state backend via redis-rs. Uses pipelining
and MGET batching to minimize round trips. Suitable for distributed multi-host deployments.
rustvello-monitoring¶
Axum web server with Askama HTML templates and HTMX for live updates. Features:
SVG timeline — visualizes invocation schedules across runners and workers
Log explorer — full-text log search with cross-entity highlighting
Invocation tables — filterable by status, task, runner, time range
Workflow view — parent/child invocation trees
Prometheus endpoint —
/metricswhenrustvello-prometheusis active
rustvello-prometheus¶
Implements EventEmitter using the metrics crate facade. Bridges rustvello lifecycle
events to Prometheus counters and histograms without a hard runtime dependency.
rustvello-test-suite¶
A single macro call generates the full compliance test suite for any backend:
rustvello_test_suite::suite_all!(MyBroker, MyOrchestrator, MyStateBackend);
Covers broker routing, orchestrator FSM, concurrency control, recovery, and cross-language queue routing.
py-rustvello¶
The maturin-built cdylib that produces the actual rustvello Python module. It depends on rustvello-python and enables the extension-module feature.
Pynenc Integration¶
The pynenc/ directory in this repository contains the pure-Python pynenc framework. Pynenc provides:
Task decorators (
@app.task)Builder API (
PynencBuilder)Triggers and scheduling
Monitoring (pynmon)
Pynenc uses rustvello as one of its backend options. The bridge classes in py-rustvello/rustvello/pynenc/ adapt the Rust-backed Broker, Orchestrator, and StateBackend to pynenc’s abstract base classes.
Composite Operations¶
See also: Pynenc Docs
To see how these composites are used by the native Python orchestrator, see Pynenc Architecture: Composites.
Composite operations bundle multiple trait calls (orchestrator, state backend,
history, trigger store, waiter, autopurge) into a single method. They exist on the
OrchestratorComposite trait and are the mechanism that enables native-mode
orchestration in the pynenc Python binding.
Why Composites?¶
Without composites, each orchestration step requires a separate FFI call from Python → Rust, each acquiring and releasing the GIL. Composites reduce this to a single FFI call per orchestration operation:
Without composites (mixed mode):
Python ──FFI──▶ check_concurrency()
Python ──FFI──▶ set_status()
Python ──FFI──▶ update_history()
Python ──FFI──▶ evaluate_triggers()
Python ──FFI──▶ notify_waiters()
With composites (native mode):
Python ──FFI──▶ set_invocation_status_full() ← all 5 steps in one call
OrchestratorComposite Trait — Hot-Path Composites¶
The 5 hot-path composites cover the most frequently executed orchestration paths:
Method |
Description |
|---|---|
|
Register invocation + concurrency check + route to broker + record history |
|
Status change + history + trigger evaluation + waiter notification + autopurge |
|
Retrieve from broker + set to Running + record history |
|
Store result + set Success + history + triggers + waiters |
|
Store error + set Failed/Retry + history + triggers |
Extended Composites¶
Additional composites for less frequent but still critical operations:
Method |
Description |
|---|---|
|
Task submission: register + concurrency check + broker route in one call |
|
Retry handling: update status + re-enqueue + history |
|
Recovery: detect stale invocations + re-queue + trigger evaluation |
|
Evaluate all pending trigger conditions in one pass |
Dual-Mode Architecture¶
See also: Pynenc Docs
For details from the Python perspective, see Pynenc Architecture: Dual Mode.
Rustvello supports two orchestration modes when used from Python (pynenc):
Mixed Mode¶
Python’s BaseOrchestrator drives coordination. Each orchestration step is a
separate FFI call into the Rust engine. Use when mixing Python and Rust backends.
Native Mode¶
A single FFI call executes the entire coordination operation inside Rust using composites. The GIL is released for the entire duration. Use for production.
Mode Selection¶
Mode selection is configuration-driven:
Choosing a
*NativeOrchestratorclass (for exampleRustSqliteNativeOrchestrator) enables composite orchestration calls for hot paths.Runner-loop delegation uses
Pynenc.is_all_rust_native, which checks that all configured backend class names start withRust(orchestrator_cls,state_backend_cls,broker_cls,trigger_cls,client_data_store_cls).
See the pynenc architecture docs for details.
Class Hierarchy (Python side)¶
BaseOrchestrator (ABC)
└── _RustvelloOrchestrator (mixed mode — multiple FFI calls per operation)
└── _RustvelloNativeOrchestrator (native mode — single composite FFI call)
Trigger Condition Model¶
The trigger system supports 6 condition types, all evaluated natively in Rust:
Condition Types¶
Type |
Description |
Example |
|---|---|---|
|
Time-based schedule |
|
|
Fires when an invocation reaches a status |
|
|
Fires on a named application event |
|
|
JSON match against a task result |
|
|
Fires on one or more exception type names |
|
|
AND/OR combination of other conditions |
|
Argument and Result Filters¶
All filters are JSON-only — no callables, no pickle, no lambdas. This restriction enables Rust-native evaluation without Python callbacks:
argument_filter— static JSON values provided at trigger definition timeresult_filter— JSON object matched against the task result
CompositeCondition¶
Composite conditions combine multiple conditions with boolean logic:
CompositeCondition::All(vec![
Condition::Status(task_a, InvocationStatus::Success),
Condition::Status(task_b, InvocationStatus::Success),
])
CompositeCondition::Any(vec![
Condition::Status(task_a, InvocationStatus::Failed),
Condition::Event("manual_override".into()),
])
Evaluation Flow¶
Management loop calls
trigger_loop_iteration()(composite in native mode)Rust iterates all registered trigger conditions
Each condition is checked against current state
Matching triggers create new invocations with the specified static arguments
No Python callbacks during evaluation — all matching happens in Rust
Error Hierarchy¶
RustvelloError Enum¶
The RustvelloError enum defines all error variants in the Rust engine:
Variant |
Description |
|---|---|
|
Explicit retry requested |
|
Concurrency control requested a retry |
|
Task resolution and registry errors |
|
Invocation ID does not exist |
|
Status transition violates the FSM |
|
Runner ownership rules were violated |
|
Optimistic status write detected a race |
|
JSON serialization/deserialization failed |
|
Backend/infrastructure failure ( |
|
Configuration parsing or validation error |
|
Internal engine error |
PyO3 Exception Mapping¶
The PyO3 layer maps RustvelloError to typed Python exceptions in the
rustvello module (crates/rustvello-python/src/error.rs):
RustvelloError::ConcurrencyRetry → ConcurrencyRetryError
RustvelloError::InvocationNotFound → InvocationNotFoundError
RustvelloError::InvalidStatusTransition→ StatusTransitionError
RustvelloError::OwnershipViolation → StatusOwnershipError
RustvelloError::StatusRaceCondition → StatusRaceConditionError
RustvelloError::Serialization → SerializationError
RustvelloError::Infrastructure → StateBackendError or RunnerError
RustvelloError::Configuration → ConfigurationError
RustvelloError::Internal → InternalError
Adapter Error Translation¶
Backend adapters translate backend-specific errors (e.g., rusqlite::Error,
redis::RedisError) into RustvelloError variants. In pynenc-facing adapters,
status exceptions are further translated to pynenc’s exception classes (for example
InvocationStatusTransitionError / InvocationStatusOwnershipError) while preserving
structured fields such as invocation_id and allowed_statuses.
Rust-Driven Runner Architecture¶
See also: Pynenc Docs
To learn how runners are configured and deployed in Python, see Pynenc Runner Usage Guide.
In native mode, the Rust engine drives the runner loop:
┌─────────────────────────────────────────────┐
│ Rust Runner Loop │
│ │
│ poll broker → set RUNNING → callback Python │
│ ↑ ↓ │
│ └── set result ← task.func(*args) │
│ │
│ management loop: heartbeats, recovery, │
│ trigger evaluation │
└─────────────────────────────────────────────┘
The GIL is only held during task.func() execution. All coordination
(broker polling, status updates, heartbeats, recovery) runs GIL-free.
Signal Handling Across FFI¶
Signal |
First Time |
Second Time |
|---|---|---|
SIGINT |
Graceful shutdown — current invocation completes |
Immediate exit |
SIGTERM |
Graceful shutdown — current invocation completes |
Immediate exit |
Rust installs signal handlers that set an atomic flag. The runner loop checks this
flag between iterations. Python’s signal.signal() cooperates via registration at init.
Adding a New Backend¶
To add a new backend (e.g., Redis):
Create a new crate
crates/rustvello-redis/Implement the
Broker,Orchestrator, andStateBackendtraits fromrustvello-coreAdd it as an optional dependency in
crates/rustvello/Cargo.tomlbehind a feature flagWire it into
Appconstruction