miniextendr_api/pump.rs
1//! [`WorkerPump<T>`] — safe main/worker thread coordination for FFI bodies.
2//!
3//! # Overview
4//!
5//! `WorkerPump` manages the common pattern of running a CPU-bound worker on a
6//! background thread while the main R thread drives a "pump" loop — processing
7//! progress events, rendering output, or any other operation that must happen
8//! on R's main thread (e.g. calls into R's C API).
9//!
10//! It wraps [`std::thread::scope`] so the worker is always joined before
11//! `run` returns, and exposes a builder API for common knobs (channel capacity,
12//! log-drain cadence).
13//!
14//! # Longjmp-safety contract
15//!
16//! **`WorkerPump::run` must be called from inside an `#[miniextendr]` FFI
17//! body** (or any code already wrapped by `with_r_unwind_protect`).
18//!
19//! The reason: the pump closure may call into R's API (e.g. to render a
20//! progress bar), and R can `longjmp` out of those calls at any time — on
21//! interrupt, on allocation failure, etc. miniextendr's macro layer wraps
22//! every `#[miniextendr]` body in `R_UnwindProtect` via
23//! `run_r_unwind_protect`.
24//!
25//! The `R_UnwindProtect` strategy used by miniextendr converts R longjmps
26//! into Rust panics (via `cleanup_handler` → `std::panic::panic_any`) which
27//! are then caught by an outer `catch_unwind`. Because the panic travels
28//! through normal Rust stack-unwinding, **all `Drop` glue runs on the way
29//! out** — including `thread::scope`'s `Drop`, which joins the worker before
30//! the scope exits. The worker sees `tx` dropped (because `rx` dropped as
31//! the scope cleaned up), so any blocked `tx.send` returns `Err` and the
32//! worker can exit gracefully. The panic is then re-raised as an R error via
33//! `R_ContinueUnwind`.
34//!
35//! If you call `WorkerPump::run` *outside* of an `#[miniextendr]` body and
36//! the pump triggers an R longjmp, the longjmp will bypass Rust destructors
37//! entirely and the worker thread will be leaked.
38//!
39//! # Error type
40//!
41//! `WorkerPump::run` uses `Result<R, Box<dyn Error + Send + Sync>>` so it
42//! composes naturally with both `anyhow::Result` (via `?`) and `std::io::Error`
43//! without requiring a hard dependency on any error-handling crate.
44//!
45//! # Example
46//!
47//! ```rust,ignore
48//! use miniextendr_api::pump::WorkerPump;
49//! use std::sync::mpsc::SyncSender;
50//!
51//! #[miniextendr]
52//! fn compress_files(paths: Vec<String>) -> i64 {
53//! WorkerPump::new()
54//! .run(
55//! // worker: runs off-main-thread
56//! |tx: SyncSender<u64>| -> Result<i64, Box<dyn std::error::Error + Send + Sync>> {
57//! let mut total = 0i64;
58//! for path in &paths {
59//! let bytes = compress_one(path)?;
60//! tx.send(bytes).ok();
61//! total += bytes as i64;
62//! }
63//! Ok(total)
64//! },
65//! // pump: runs on main thread, may call R API
66//! |bytes| render_progress(bytes),
67//! )
68//! .expect("compression failed")
69//! }
70//! ```
71
72use std::error::Error;
73use std::marker::PhantomData;
74use std::sync::mpsc::{self, SyncSender};
75use std::thread;
76
77/// Boxed, thread-safe error type used by [`WorkerPump::run`].
78///
79/// Alias for `Box<dyn Error + Send + Sync>`. Compatible with `anyhow::Error`
80/// via `?` and with standard library error types without requiring extra
81/// dependencies.
82pub type WorkerError = Box<dyn Error + Send + Sync>;
83
84/// Runs a worker thread in parallel with a main-thread pump loop.
85///
86/// See [the module documentation][self] for the longjmp-safety contract and a
87/// usage example.
88pub struct WorkerPump<T> {
89 /// Capacity of the bounded MPSC channel between worker and pump.
90 capacity: usize,
91 /// Whether to drain the cross-thread log queue on every pump tick.
92 drain_logs_each_tick: bool,
93 _marker: PhantomData<fn() -> T>,
94}
95
96impl<T: Send + 'static> WorkerPump<T> {
97 /// Create a new `WorkerPump` with default settings.
98 ///
99 /// Defaults:
100 /// - channel capacity: 64
101 /// - `drain_logs_each_tick`: `true`
102 pub fn new() -> Self {
103 Self {
104 capacity: 64,
105 drain_logs_each_tick: true,
106 _marker: PhantomData,
107 }
108 }
109
110 /// Set the capacity of the bounded MPSC channel.
111 ///
112 /// The default is 64. A larger capacity allows the worker to get further
113 /// ahead of the pump; a capacity of 0 makes every send synchronous
114 /// (rendezvous channel).
115 ///
116 /// When the channel is full the worker blocks on `tx.send` until the pump
117 /// drains a slot. If the pump panics or a longjmp fires, `rx` is dropped
118 /// as part of scope unwinding, which unblocks `tx.send` with an `Err` and
119 /// lets the worker exit cleanly.
120 pub fn channel_capacity(mut self, n: usize) -> Self {
121 self.capacity = n;
122 self
123 }
124
125 /// Control whether the cross-thread log queue is drained on every pump tick.
126 ///
127 /// Default: `true`. Set to `false` if the consumer manages its own log
128 /// drain cadence (e.g. it calls `drain_log_queue()` explicitly at
129 /// coarser granularity).
130 ///
131 /// Has no effect when the `log` feature is disabled.
132 pub fn drain_logs_each_tick(mut self, on: bool) -> Self {
133 self.drain_logs_each_tick = on;
134 self
135 }
136
137 /// Run the worker/pump pair and return the worker's result.
138 ///
139 /// - `worker` runs on a scoped background thread. It receives a
140 /// [`SyncSender<T>`] and sends messages to the pump. When `worker`
141 /// returns (success or error) it should drop `tx`; the pump's receive
142 /// loop then terminates naturally.
143 /// - `pump` is called on the **current (main R) thread** for every message
144 /// the worker sends.
145 ///
146 /// `run` returns `Ok(R)` on success, or `Err` if the worker returned an
147 /// error or panicked.
148 ///
149 /// # Panics
150 ///
151 /// If the worker thread panics, `run` returns
152 /// `Err("WorkerPump worker panicked")`.
153 ///
154 /// If the pump closure panics, the panic propagates normally through
155 /// `thread::scope`'s `Drop` (which joins the worker), and then out of
156 /// `run`. When called from inside an `#[miniextendr]` body the outer
157 /// `R_UnwindProtect` catches it and converts it to an R error.
158 pub fn run<R, W, P>(self, worker: W, mut pump: P) -> Result<R, WorkerError>
159 where
160 R: Send,
161 W: FnOnce(SyncSender<T>) -> Result<R, WorkerError> + Send,
162 P: FnMut(T),
163 {
164 thread::scope(|scope| {
165 let (tx, rx) = mpsc::sync_channel(self.capacity);
166 let handle = scope.spawn(move || worker(tx));
167 for msg in rx {
168 if self.drain_logs_each_tick {
169 #[cfg(feature = "log")]
170 crate::optionals::log_impl::drain_log_queue();
171 }
172 pump(msg);
173 }
174 handle
175 .join()
176 .map_err(|_| -> WorkerError { "WorkerPump worker panicked".into() })?
177 })
178 }
179}
180
181impl<T: Send + 'static> Default for WorkerPump<T> {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187// region: Tests
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use std::sync::atomic::{AtomicBool, Ordering};
193 use std::sync::{Arc, Mutex};
194
195 // -------------------------------------------------------------------------
196 // happy_path
197 // -------------------------------------------------------------------------
198
199 /// Worker sends N messages, pump receives all, worker returns Ok.
200 #[test]
201 fn happy_path() {
202 let received = Arc::new(Mutex::new(Vec::<i64>::new()));
203 let received2 = Arc::clone(&received);
204
205 let result = WorkerPump::new()
206 .drain_logs_each_tick(false)
207 .run(
208 |tx| {
209 for i in 0..5i64 {
210 tx.send(i).unwrap();
211 }
212 Ok(42i64)
213 },
214 |msg| {
215 received2.lock().unwrap().push(msg);
216 },
217 )
218 .expect("run failed");
219
220 assert_eq!(result, 42);
221 let got = received.lock().unwrap();
222 assert_eq!(*got, vec![0, 1, 2, 3, 4]);
223 }
224
225 // -------------------------------------------------------------------------
226 // worker_returns_err
227 // -------------------------------------------------------------------------
228
229 /// Worker returns Err early; pump exits cleanly; run returns the worker's Err.
230 #[test]
231 fn worker_returns_err() {
232 let pump_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
233 let pump_count2 = Arc::clone(&pump_count);
234
235 let result: Result<(), WorkerError> = WorkerPump::new().drain_logs_each_tick(false).run(
236 |_tx| {
237 // drop tx immediately, send no messages
238 Err("deliberate worker error".into())
239 },
240 |_msg: ()| {
241 pump_count2.fetch_add(1, Ordering::Relaxed);
242 },
243 );
244
245 assert!(result.is_err());
246 let msg = format!("{}", result.unwrap_err());
247 assert!(
248 msg.contains("deliberate worker error"),
249 "unexpected error: {msg}"
250 );
251 assert_eq!(
252 pump_count.load(Ordering::Relaxed),
253 0,
254 "pump must not run if worker sends nothing"
255 );
256 }
257
258 // -------------------------------------------------------------------------
259 // pump_panics
260 // -------------------------------------------------------------------------
261
262 /// Pump callback panics; worker is joined (no thread leak).
263 ///
264 /// We instrument the worker with a `Drop` guard that flips an `AtomicBool`
265 /// to confirm it ran after the scope unwinds.
266 #[test]
267 fn pump_panics() {
268 // Instrument: flipped to true when the worker's guard is dropped.
269 let worker_dropped = Arc::new(AtomicBool::new(false));
270 let worker_dropped2 = Arc::clone(&worker_dropped);
271
272 struct DropGuard(Arc<AtomicBool>);
273 impl Drop for DropGuard {
274 fn drop(&mut self) {
275 self.0.store(true, Ordering::Relaxed);
276 }
277 }
278
279 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
280 WorkerPump::<u8>::new().drain_logs_each_tick(false).run(
281 move |tx| {
282 let _guard = DropGuard(worker_dropped2);
283 // Send one message to ensure pump fires at least once,
284 // then block until rx is dropped (pump panic).
285 let _ = tx.send(1u8);
286 // The second send will return Err when rx drops on pump panic.
287 let _ = tx.send(2u8);
288 Ok(())
289 },
290 |_msg: u8| {
291 panic!("pump panic");
292 },
293 )
294 }));
295
296 // The outer catch_unwind should see the pump panic propagate.
297 assert!(result.is_err(), "expected panic to propagate");
298 // The worker's DropGuard must have run — confirms worker was joined.
299 assert!(
300 worker_dropped.load(Ordering::Relaxed),
301 "worker Drop did not run — possible thread leak"
302 );
303 }
304
305 // -------------------------------------------------------------------------
306 // bounded_channel_no_deadlock_on_scope_drop
307 // -------------------------------------------------------------------------
308
309 /// Fill the bounded channel; break out of pump early; assert worker exits.
310 ///
311 /// The worker tries to send more messages than the channel can hold. The
312 /// pump breaks after the first message (simulating an early exit), which
313 /// drops `rx`. The worker's blocked `tx.send` must return `Err` (not
314 /// deadlock), and the worker must finish.
315 ///
316 /// We verify no deadlock by asserting `run` completes before the test
317 /// times out (Rust's test harness kills hanging tests).
318 #[test]
319 fn bounded_channel_no_deadlock_on_scope_drop() {
320 // Use a channel capacity of 1 so the worker blocks quickly.
321 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
322 WorkerPump::<u8>::new()
323 .channel_capacity(1)
324 .drain_logs_each_tick(false)
325 .run(
326 |tx| {
327 // Try to send many messages — will block after slot 1 fills.
328 for i in 0u8..=10 {
329 if tx.send(i).is_err() {
330 // rx was dropped (pump broke out), exit cleanly.
331 break;
332 }
333 }
334 Ok(())
335 },
336 |_msg: u8| {
337 // Simulate early pump exit by panicking after first message.
338 panic!("early pump exit");
339 },
340 )
341 }));
342
343 // We get a panic from the pump, but no deadlock (test would hang otherwise).
344 assert!(result.is_err(), "expected pump panic");
345 }
346
347 // -------------------------------------------------------------------------
348 // drain_logs_each_tick_default_on
349 // -------------------------------------------------------------------------
350
351 /// With the `log` feature enabled: a log record from the worker is flushed
352 /// to the render sink by the time the pump processes the next message.
353 ///
354 /// Without the `log` feature this test reduces to a basic send/recv check.
355 #[test]
356 fn drain_logs_each_tick_default_on() {
357 #[cfg(feature = "log")]
358 {
359 use crate::optionals::log_impl::{
360 LOG_TEST_LOCK, install_r_logger, set_fake_main_thread, set_log_level, take_rendered,
361 };
362
363 // Acquire the same lock that log_impl tests use so we don't race on
364 // the shared QUEUE / DROPPED / TEST_SINK globals.
365 let _guard = LOG_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
366
367 install_r_logger();
368 set_log_level("trace");
369 // Clear residual state from prior tests.
370 take_rendered();
371
372 let rendered_before_pump: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
373 let rendered_before_pump2 = Arc::clone(&rendered_before_pump);
374
375 // The pump loop (and drain_log_queue inside it) runs on *this* thread.
376 // Mark this thread as main so drain_log_queue() actually drains.
377 // The worker is a real scoped thread whose FAKE_IS_MAIN is None,
378 // so is_main() → is_r_main_thread() → false → records go to queue.
379 set_fake_main_thread(Some(true));
380
381 WorkerPump::<u8>::new()
382 .drain_logs_each_tick(true)
383 .run(
384 |tx| {
385 // Runs on a real background thread; is_main() == false
386 // → log record is buffered in QUEUE, not rendered yet.
387 log::info!("hello from worker");
388 tx.send(1u8).ok();
389 Ok(())
390 },
391 move |msg: u8| {
392 // WorkerPump calls drain_log_queue() BEFORE calling us.
393 // Since is_main() == true on the pump thread, the queue
394 // was already drained → record is in TEST_SINK.
395 let rendered = take_rendered();
396 rendered_before_pump2.lock().unwrap().extend(rendered);
397 let _ = msg;
398 },
399 )
400 .expect("run failed");
401
402 // Restore state.
403 set_fake_main_thread(None);
404 take_rendered();
405
406 // The drain happened before pump(msg), so the record must have
407 // appeared in the captured snapshot.
408 let rendered = rendered_before_pump.lock().unwrap();
409 assert!(
410 rendered.iter().any(|m| m.contains("hello from worker")),
411 "log record was not drained before pump tick; got: {rendered:?}"
412 );
413 }
414
415 // When `log` feature is disabled: just verify run completes.
416 #[cfg(not(feature = "log"))]
417 {
418 let result: Result<u8, WorkerError> = WorkerPump::new().drain_logs_each_tick(true).run(
419 |tx| {
420 tx.send(99u8).ok();
421 Ok(99u8)
422 },
423 |_| {},
424 );
425 assert_eq!(result.unwrap(), 99u8);
426 }
427 }
428}
429
430// endregion