Skip to main content

miniextendr_api/
pump.rs

1//! [`WorkerPump<T>`] — safe main/worker thread coordination for FFI bodies.
2//!
3//! # Overview
4//!
5//! `WorkerPump` manages the common pattern of running a CPU-bound worker on a
6//! background thread while the main R thread drives a "pump" loop — processing
7//! progress events, rendering output, or any other operation that must happen
8//! on R's main thread (e.g. calls into R's C API).
9//!
10//! It wraps [`std::thread::scope`] so the worker is always joined before
11//! `run` returns, and exposes a builder API for common knobs (channel capacity,
12//! log-drain cadence).
13//!
14//! # Longjmp-safety contract
15//!
16//! **`WorkerPump::run` must be called from inside an `#[miniextendr]` FFI
17//! body** (or any code already wrapped by `with_r_unwind_protect`).
18//!
19//! The reason: the pump closure may call into R's API (e.g. to render a
20//! progress bar), and R can `longjmp` out of those calls at any time — on
21//! interrupt, on allocation failure, etc.  miniextendr's macro layer wraps
22//! every `#[miniextendr]` body in `R_UnwindProtect` via
23//! `run_r_unwind_protect`.
24//!
25//! The `R_UnwindProtect` strategy used by miniextendr converts R longjmps
26//! into Rust panics (via `cleanup_handler` → `std::panic::panic_any`) which
27//! are then caught by an outer `catch_unwind`.  Because the panic travels
28//! through normal Rust stack-unwinding, **all `Drop` glue runs on the way
29//! out** — including `thread::scope`'s `Drop`, which joins the worker before
30//! the scope exits.  The worker sees `tx` dropped (because `rx` dropped as
31//! the scope cleaned up), so any blocked `tx.send` returns `Err` and the
32//! worker can exit gracefully.  The panic is then re-raised as an R error via
33//! `R_ContinueUnwind`.
34//!
35//! If you call `WorkerPump::run` *outside* of an `#[miniextendr]` body and
36//! the pump triggers an R longjmp, the longjmp will bypass Rust destructors
37//! entirely and the worker thread will be leaked.
38//!
39//! # Error type
40//!
41//! `WorkerPump::run` uses `Result<R, Box<dyn Error + Send + Sync>>` so it
42//! composes naturally with both `anyhow::Result` (via `?`) and `std::io::Error`
43//! without requiring a hard dependency on any error-handling crate.
44//!
45//! # Example
46//!
47//! ```rust,ignore
48//! use miniextendr_api::pump::WorkerPump;
49//! use std::sync::mpsc::SyncSender;
50//!
51//! #[miniextendr]
52//! fn compress_files(paths: Vec<String>) -> i64 {
53//!     WorkerPump::new()
54//!         .run(
55//!             // worker: runs off-main-thread
56//!             |tx: SyncSender<u64>| -> Result<i64, Box<dyn std::error::Error + Send + Sync>> {
57//!                 let mut total = 0i64;
58//!                 for path in &paths {
59//!                     let bytes = compress_one(path)?;
60//!                     tx.send(bytes).ok();
61//!                     total += bytes as i64;
62//!                 }
63//!                 Ok(total)
64//!             },
65//!             // pump: runs on main thread, may call R API
66//!             |bytes| render_progress(bytes),
67//!         )
68//!         .expect("compression failed")
69//! }
70//! ```
71
72use std::error::Error;
73use std::marker::PhantomData;
74use std::sync::mpsc::{self, SyncSender};
75use std::thread;
76
77/// Boxed, thread-safe error type used by [`WorkerPump::run`].
78///
79/// Alias for `Box<dyn Error + Send + Sync>`. Compatible with `anyhow::Error`
80/// via `?` and with standard library error types without requiring extra
81/// dependencies.
82pub type WorkerError = Box<dyn Error + Send + Sync>;
83
84/// Runs a worker thread in parallel with a main-thread pump loop.
85///
86/// See [the module documentation][self] for the longjmp-safety contract and a
87/// usage example.
88pub struct WorkerPump<T> {
89    /// Capacity of the bounded MPSC channel between worker and pump.
90    capacity: usize,
91    /// Whether to drain the cross-thread log queue on every pump tick.
92    drain_logs_each_tick: bool,
93    _marker: PhantomData<fn() -> T>,
94}
95
96impl<T: Send + 'static> WorkerPump<T> {
97    /// Create a new `WorkerPump` with default settings.
98    ///
99    /// Defaults:
100    /// - channel capacity: 64
101    /// - `drain_logs_each_tick`: `true`
102    pub fn new() -> Self {
103        Self {
104            capacity: 64,
105            drain_logs_each_tick: true,
106            _marker: PhantomData,
107        }
108    }
109
110    /// Set the capacity of the bounded MPSC channel.
111    ///
112    /// The default is 64.  A larger capacity allows the worker to get further
113    /// ahead of the pump; a capacity of 0 makes every send synchronous
114    /// (rendezvous channel).
115    ///
116    /// When the channel is full the worker blocks on `tx.send` until the pump
117    /// drains a slot.  If the pump panics or a longjmp fires, `rx` is dropped
118    /// as part of scope unwinding, which unblocks `tx.send` with an `Err` and
119    /// lets the worker exit cleanly.
120    pub fn channel_capacity(mut self, n: usize) -> Self {
121        self.capacity = n;
122        self
123    }
124
125    /// Control whether the cross-thread log queue is drained on every pump tick.
126    ///
127    /// Default: `true`.  Set to `false` if the consumer manages its own log
128    /// drain cadence (e.g. it calls `drain_log_queue()` explicitly at
129    /// coarser granularity).
130    ///
131    /// Has no effect when the `log` feature is disabled.
132    pub fn drain_logs_each_tick(mut self, on: bool) -> Self {
133        self.drain_logs_each_tick = on;
134        self
135    }
136
137    /// Run the worker/pump pair and return the worker's result.
138    ///
139    /// - `worker` runs on a scoped background thread.  It receives a
140    ///   [`SyncSender<T>`] and sends messages to the pump.  When `worker`
141    ///   returns (success or error) it should drop `tx`; the pump's receive
142    ///   loop then terminates naturally.
143    /// - `pump` is called on the **current (main R) thread** for every message
144    ///   the worker sends.
145    ///
146    /// `run` returns `Ok(R)` on success, or `Err` if the worker returned an
147    /// error or panicked.
148    ///
149    /// # Panics
150    ///
151    /// If the worker thread panics, `run` returns
152    /// `Err("WorkerPump worker panicked")`.
153    ///
154    /// If the pump closure panics, the panic propagates normally through
155    /// `thread::scope`'s `Drop` (which joins the worker), and then out of
156    /// `run`.  When called from inside an `#[miniextendr]` body the outer
157    /// `R_UnwindProtect` catches it and converts it to an R error.
158    pub fn run<R, W, P>(self, worker: W, mut pump: P) -> Result<R, WorkerError>
159    where
160        R: Send,
161        W: FnOnce(SyncSender<T>) -> Result<R, WorkerError> + Send,
162        P: FnMut(T),
163    {
164        thread::scope(|scope| {
165            let (tx, rx) = mpsc::sync_channel(self.capacity);
166            let handle = scope.spawn(move || worker(tx));
167            for msg in rx {
168                if self.drain_logs_each_tick {
169                    #[cfg(feature = "log")]
170                    crate::optionals::log_impl::drain_log_queue();
171                }
172                pump(msg);
173            }
174            handle
175                .join()
176                .map_err(|_| -> WorkerError { "WorkerPump worker panicked".into() })?
177        })
178    }
179}
180
181impl<T: Send + 'static> Default for WorkerPump<T> {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187// region: Tests
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use std::sync::atomic::{AtomicBool, Ordering};
193    use std::sync::{Arc, Mutex};
194
195    // -------------------------------------------------------------------------
196    // happy_path
197    // -------------------------------------------------------------------------
198
199    /// Worker sends N messages, pump receives all, worker returns Ok.
200    #[test]
201    fn happy_path() {
202        let received = Arc::new(Mutex::new(Vec::<i64>::new()));
203        let received2 = Arc::clone(&received);
204
205        let result = WorkerPump::new()
206            .drain_logs_each_tick(false)
207            .run(
208                |tx| {
209                    for i in 0..5i64 {
210                        tx.send(i).unwrap();
211                    }
212                    Ok(42i64)
213                },
214                |msg| {
215                    received2.lock().unwrap().push(msg);
216                },
217            )
218            .expect("run failed");
219
220        assert_eq!(result, 42);
221        let got = received.lock().unwrap();
222        assert_eq!(*got, vec![0, 1, 2, 3, 4]);
223    }
224
225    // -------------------------------------------------------------------------
226    // worker_returns_err
227    // -------------------------------------------------------------------------
228
229    /// Worker returns Err early; pump exits cleanly; run returns the worker's Err.
230    #[test]
231    fn worker_returns_err() {
232        let pump_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
233        let pump_count2 = Arc::clone(&pump_count);
234
235        let result: Result<(), WorkerError> = WorkerPump::new().drain_logs_each_tick(false).run(
236            |_tx| {
237                // drop tx immediately, send no messages
238                Err("deliberate worker error".into())
239            },
240            |_msg: ()| {
241                pump_count2.fetch_add(1, Ordering::Relaxed);
242            },
243        );
244
245        assert!(result.is_err());
246        let msg = format!("{}", result.unwrap_err());
247        assert!(
248            msg.contains("deliberate worker error"),
249            "unexpected error: {msg}"
250        );
251        assert_eq!(
252            pump_count.load(Ordering::Relaxed),
253            0,
254            "pump must not run if worker sends nothing"
255        );
256    }
257
258    // -------------------------------------------------------------------------
259    // pump_panics
260    // -------------------------------------------------------------------------
261
262    /// Pump callback panics; worker is joined (no thread leak).
263    ///
264    /// We instrument the worker with a `Drop` guard that flips an `AtomicBool`
265    /// to confirm it ran after the scope unwinds.
266    #[test]
267    fn pump_panics() {
268        // Instrument: flipped to true when the worker's guard is dropped.
269        let worker_dropped = Arc::new(AtomicBool::new(false));
270        let worker_dropped2 = Arc::clone(&worker_dropped);
271
272        struct DropGuard(Arc<AtomicBool>);
273        impl Drop for DropGuard {
274            fn drop(&mut self) {
275                self.0.store(true, Ordering::Relaxed);
276            }
277        }
278
279        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
280            WorkerPump::<u8>::new().drain_logs_each_tick(false).run(
281                move |tx| {
282                    let _guard = DropGuard(worker_dropped2);
283                    // Send one message to ensure pump fires at least once,
284                    // then block until rx is dropped (pump panic).
285                    let _ = tx.send(1u8);
286                    // The second send will return Err when rx drops on pump panic.
287                    let _ = tx.send(2u8);
288                    Ok(())
289                },
290                |_msg: u8| {
291                    panic!("pump panic");
292                },
293            )
294        }));
295
296        // The outer catch_unwind should see the pump panic propagate.
297        assert!(result.is_err(), "expected panic to propagate");
298        // The worker's DropGuard must have run — confirms worker was joined.
299        assert!(
300            worker_dropped.load(Ordering::Relaxed),
301            "worker Drop did not run — possible thread leak"
302        );
303    }
304
305    // -------------------------------------------------------------------------
306    // bounded_channel_no_deadlock_on_scope_drop
307    // -------------------------------------------------------------------------
308
309    /// Fill the bounded channel; break out of pump early; assert worker exits.
310    ///
311    /// The worker tries to send more messages than the channel can hold.  The
312    /// pump breaks after the first message (simulating an early exit), which
313    /// drops `rx`.  The worker's blocked `tx.send` must return `Err` (not
314    /// deadlock), and the worker must finish.
315    ///
316    /// We verify no deadlock by asserting `run` completes before the test
317    /// times out (Rust's test harness kills hanging tests).
318    #[test]
319    fn bounded_channel_no_deadlock_on_scope_drop() {
320        // Use a channel capacity of 1 so the worker blocks quickly.
321        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
322            WorkerPump::<u8>::new()
323                .channel_capacity(1)
324                .drain_logs_each_tick(false)
325                .run(
326                    |tx| {
327                        // Try to send many messages — will block after slot 1 fills.
328                        for i in 0u8..=10 {
329                            if tx.send(i).is_err() {
330                                // rx was dropped (pump broke out), exit cleanly.
331                                break;
332                            }
333                        }
334                        Ok(())
335                    },
336                    |_msg: u8| {
337                        // Simulate early pump exit by panicking after first message.
338                        panic!("early pump exit");
339                    },
340                )
341        }));
342
343        // We get a panic from the pump, but no deadlock (test would hang otherwise).
344        assert!(result.is_err(), "expected pump panic");
345    }
346
347    // -------------------------------------------------------------------------
348    // drain_logs_each_tick_default_on
349    // -------------------------------------------------------------------------
350
351    /// With the `log` feature enabled: a log record from the worker is flushed
352    /// to the render sink by the time the pump processes the next message.
353    ///
354    /// Without the `log` feature this test reduces to a basic send/recv check.
355    #[test]
356    fn drain_logs_each_tick_default_on() {
357        #[cfg(feature = "log")]
358        {
359            use crate::optionals::log_impl::{
360                LOG_TEST_LOCK, install_r_logger, set_fake_main_thread, set_log_level, take_rendered,
361            };
362
363            // Acquire the same lock that log_impl tests use so we don't race on
364            // the shared QUEUE / DROPPED / TEST_SINK globals.
365            let _guard = LOG_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
366
367            install_r_logger();
368            set_log_level("trace");
369            // Clear residual state from prior tests.
370            take_rendered();
371
372            let rendered_before_pump: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
373            let rendered_before_pump2 = Arc::clone(&rendered_before_pump);
374
375            // The pump loop (and drain_log_queue inside it) runs on *this* thread.
376            // Mark this thread as main so drain_log_queue() actually drains.
377            // The worker is a real scoped thread whose FAKE_IS_MAIN is None,
378            // so is_main() → is_r_main_thread() → false → records go to queue.
379            set_fake_main_thread(Some(true));
380
381            WorkerPump::<u8>::new()
382                .drain_logs_each_tick(true)
383                .run(
384                    |tx| {
385                        // Runs on a real background thread; is_main() == false
386                        // → log record is buffered in QUEUE, not rendered yet.
387                        log::info!("hello from worker");
388                        tx.send(1u8).ok();
389                        Ok(())
390                    },
391                    move |msg: u8| {
392                        // WorkerPump calls drain_log_queue() BEFORE calling us.
393                        // Since is_main() == true on the pump thread, the queue
394                        // was already drained → record is in TEST_SINK.
395                        let rendered = take_rendered();
396                        rendered_before_pump2.lock().unwrap().extend(rendered);
397                        let _ = msg;
398                    },
399                )
400                .expect("run failed");
401
402            // Restore state.
403            set_fake_main_thread(None);
404            take_rendered();
405
406            // The drain happened before pump(msg), so the record must have
407            // appeared in the captured snapshot.
408            let rendered = rendered_before_pump.lock().unwrap();
409            assert!(
410                rendered.iter().any(|m| m.contains("hello from worker")),
411                "log record was not drained before pump tick; got: {rendered:?}"
412            );
413        }
414
415        // When `log` feature is disabled: just verify run completes.
416        #[cfg(not(feature = "log"))]
417        {
418            let result: Result<u8, WorkerError> = WorkerPump::new().drain_logs_each_tick(true).run(
419                |tx| {
420                    tx.send(99u8).ok();
421                    Ok(99u8)
422                },
423                |_| {},
424            );
425            assert_eq!(result.unwrap(), 99u8);
426        }
427    }
428}
429
430// endregion