Skip to main content

Module pump

Module pump 

Source
Expand description

WorkerPump<T> — safe main/worker thread coordination for FFI bodies.

§Overview

WorkerPump manages the common pattern of running a CPU-bound worker on a background thread while the main R thread drives a “pump” loop — processing progress events, rendering output, or any other operation that must happen on R’s main thread (e.g. calls into R’s C API).

It wraps std::thread::scope so the worker is always joined before run returns, and exposes a builder API for common knobs (channel capacity, log-drain cadence).

§Longjmp-safety contract

WorkerPump::run must be called from inside an #[miniextendr] FFI body (or any code already wrapped by with_r_unwind_protect).

The reason: the pump closure may call into R’s API (e.g. to render a progress bar), and R can longjmp out of those calls at any time — on interrupt, on allocation failure, etc. miniextendr’s macro layer wraps every #[miniextendr] body in R_UnwindProtect via run_r_unwind_protect.

The R_UnwindProtect strategy used by miniextendr converts R longjmps into Rust panics (via cleanup_handlerstd::panic::panic_any) which are then caught by an outer catch_unwind. Because the panic travels through normal Rust stack-unwinding, all Drop glue runs on the way out — including thread::scope’s Drop, which joins the worker before the scope exits. The worker sees tx dropped (because rx dropped as the scope cleaned up), so any blocked tx.send returns Err and the worker can exit gracefully. The panic is then re-raised as an R error via R_ContinueUnwind.

If you call WorkerPump::run outside of an #[miniextendr] body and the pump triggers an R longjmp, the longjmp will bypass Rust destructors entirely and the worker thread will be leaked.

§Error type

WorkerPump::run uses Result<R, Box<dyn Error + Send + Sync>> so it composes naturally with both anyhow::Result (via ?) and std::io::Error without requiring a hard dependency on any error-handling crate.

§Example

use miniextendr_api::pump::WorkerPump;
use std::sync::mpsc::SyncSender;

#[miniextendr]
fn compress_files(paths: Vec<String>) -> i64 {
    WorkerPump::new()
        .run(
            // worker: runs off-main-thread
            |tx: SyncSender<u64>| -> Result<i64, Box<dyn std::error::Error + Send + Sync>> {
                let mut total = 0i64;
                for path in &paths {
                    let bytes = compress_one(path)?;
                    tx.send(bytes).ok();
                    total += bytes as i64;
                }
                Ok(total)
            },
            // pump: runs on main thread, may call R API
            |bytes| render_progress(bytes),
        )
        .expect("compression failed")
}

Structs§

WorkerPump
Runs a worker thread in parallel with a main-thread pump loop.

Type Aliases§

WorkerError
Boxed, thread-safe error type used by WorkerPump::run.