miniextendr_api/worker.rs
1//! Worker thread infrastructure for safe Rust-R FFI.
2//!
3//! ## Public API
4//!
5//! - [`with_r_thread`] — Execute a closure on R's main thread
6//! - [`is_r_main_thread`] — Check if the current thread is R's main thread
7//!
8//! ## Feature gate: `worker-thread`
9//!
10//! Without the `worker-thread` cargo feature, all calls execute inline on
11//! R's main thread:
12//! - `with_r_thread(f)` runs `f()` directly (panics if not on main thread)
13//! - `run_on_worker(f)` runs `f()` directly, returns `Ok(f())`
14//!
15//! With the feature enabled, a dedicated worker thread is spawned at init time.
16//! `with_r_thread` routes calls from the worker back to main, and `run_on_worker`
17//! dispatches closures to the worker with bidirectional communication.
18//!
19//! ## Initialization
20//!
21//! [`miniextendr_runtime_init`] must be called from R's main thread before any
22//! R FFI APIs. Typically done in `R_init_<pkgname>()`.
23
24use std::sync::OnceLock;
25use std::thread;
26
27use crate::ffi::{self, SEXP};
28
29static R_MAIN_THREAD_ID: OnceLock<thread::ThreadId> = OnceLock::new();
30
31// region: Public API
32
33/// Wrapper to mark values as Send for main-thread routing.
34///
35/// Only safe if the value is not accessed on the worker thread and is
36/// used exclusively on the main thread.
37#[doc(hidden)]
38#[repr(transparent)]
39#[derive(Clone, Copy)]
40pub struct Sendable<T>(pub T);
41
42unsafe impl<T> Send for Sendable<T> {}
43
44/// Check if the current thread is R's main thread.
45///
46/// Returns `true` if called from the main R thread, `false` otherwise.
47/// Before `miniextendr_runtime_init()` is called, always returns `false`.
48#[inline(always)]
49pub fn is_r_main_thread() -> bool {
50 R_MAIN_THREAD_ID
51 .get()
52 .map(|&id| id == std::thread::current().id())
53 .unwrap_or(false)
54}
55
56/// Execute a closure on R's main thread, returning the result.
57///
58/// This function can be called from any thread:
59/// - From the main thread: executes the closure directly (re-entrant)
60/// - From the worker thread (during `run_on_worker`): sends the work to
61/// the main thread and blocks until completion
62///
63/// # Panics
64///
65/// - If `miniextendr_runtime_init()` hasn't been called yet
66/// - If called from a non-main thread without the `worker-thread` feature
67/// - If called from a non-main thread outside of a `run_on_worker` context
68/// (even with the `worker-thread` feature)
69///
70/// # Example
71///
72/// ```ignore
73/// use miniextendr_api::with_r_thread;
74///
75/// // From worker thread, safely call R APIs:
76/// let sexp = with_r_thread(|| {
77/// // This runs on R's main thread
78/// SEXP::nil()
79/// });
80/// ```
81pub fn with_r_thread<F, R>(f: F) -> R
82where
83 F: FnOnce() -> R + 'static,
84 R: Send + 'static,
85{
86 assert_runtime_initialized();
87
88 if is_r_main_thread() {
89 return f();
90 }
91
92 // Not on main thread — need worker-thread feature for routing
93 #[cfg(not(feature = "worker-thread"))]
94 {
95 panic!(
96 "with_r_thread called from a non-main thread without the `worker-thread` feature.\n\
97 \n\
98 Without `worker-thread`, R API calls can only happen on the R main thread.\n\
99 Either:\n\
100 - Enable the `worker-thread` cargo feature to route calls from background threads, or\n\
101 - Ensure this code only runs on the R main thread."
102 );
103 }
104
105 #[cfg(feature = "worker-thread")]
106 {
107 worker_channel::route_to_main_thread(f)
108 }
109}
110// endregion
111
112// region: #[doc(hidden)] items for macro-generated code
113
114/// Raise an R error from a panic message. Does not return.
115///
116/// If `call` is `Some(sexp)`, uses `Rf_errorcall` to include call context.
117#[doc(hidden)]
118pub fn panic_message_to_r_error(msg: String, call: Option<SEXP>) -> ! {
119 let c_msg = std::ffi::CString::new(msg)
120 .unwrap_or_else(|_| std::ffi::CString::new("Rust panic (invalid message)").unwrap());
121 unsafe {
122 match call {
123 Some(call) => ffi::Rf_errorcall_unchecked(call, c"%s".as_ptr(), c_msg.as_ptr()),
124 None => ffi::Rf_error_unchecked(c"%s".as_ptr(), c_msg.as_ptr()),
125 }
126 }
127}
128
129/// Run a closure on the worker thread with proper cleanup on panic.
130///
131/// Returns `Ok(T)` on success, `Err(String)` if the closure panicked.
132/// The caller handles the error (either tagged error value or `Rf_errorcall`).
133///
134/// Without the `worker-thread` feature, runs inline on the current thread.
135#[doc(hidden)]
136pub fn run_on_worker<F, T>(f: F) -> Result<T, String>
137where
138 F: FnOnce() -> T + Send + 'static,
139 T: Send + 'static,
140{
141 #[cfg(not(feature = "worker-thread"))]
142 {
143 Ok(f())
144 }
145
146 #[cfg(feature = "worker-thread")]
147 {
148 let result = worker_channel::dispatch_to_worker(f);
149 if let Err(ref msg) = result {
150 crate::panic_telemetry::fire(msg, crate::panic_telemetry::PanicSource::Worker);
151 }
152 result
153 }
154}
155
156/// Initialize the miniextendr runtime.
157///
158/// Records the main thread ID and (with `worker-thread`) spawns the worker.
159/// Must be called from R's main thread, typically from `R_init_<pkgname>`.
160#[doc(hidden)]
161#[unsafe(no_mangle)]
162pub extern "C-unwind" fn miniextendr_runtime_init() {
163 static RUN_ONCE: std::sync::Once = std::sync::Once::new();
164
165 #[cfg(feature = "worker-thread")]
166 {
167 RUN_ONCE.call_once_force(|x| {
168 if x.is_poisoned() {
169 eprintln!(
170 "warning: miniextendr worker init is retrying after a previous failed attempt"
171 );
172 }
173
174 let current_id = std::thread::current().id();
175 if let Some(&existing_id) = R_MAIN_THREAD_ID.get() {
176 if existing_id != current_id {
177 panic!(
178 "miniextendr_runtime_init called from thread {:?}, but R_MAIN_THREAD_ID \
179 was already set to {:?}. This indicates incorrect initialization order.",
180 current_id, existing_id
181 );
182 }
183 } else {
184 let _ = R_MAIN_THREAD_ID.set(current_id);
185 }
186
187 worker_channel::init_worker();
188
189 // NB: no libc `atexit` registration.
190 //
191 // `atexit` stores a function pointer into the DLL's code. If the
192 // package is unloaded (e.g. via `library.dynam.unload` / dyn.unload
193 // / devtools::load_all(reset = TRUE)) before libc runs its atexit
194 // registry at process exit, the handler jumps to an unmapped
195 // address and tears down the process's SEH state on Windows —
196 // which manifests as "fatal runtime error: failed to initiate
197 // panic, error 5" in the next DLL that tries to unwind. #277.
198 //
199 // The normal path (package unload → `R_unload_<pkg>` →
200 // `miniextendr_runtime_shutdown`) already joins the worker
201 // cleanly. The abnormal path (process exit without unload, e.g.
202 // `q("no")`) relies on the OS to reap the worker thread, which
203 // it does — we don't need graceful shutdown for a dying process.
204 });
205 }
206
207 #[cfg(not(feature = "worker-thread"))]
208 {
209 RUN_ONCE.call_once(|| {
210 let _ = R_MAIN_THREAD_ID.set(std::thread::current().id());
211 });
212 }
213}
214
215/// Shut down the miniextendr worker thread synchronously.
216///
217/// Called from `R_unload_<pkg>` (generated by `miniextendr_init!`). Sends a
218/// `Shutdown` message to the worker, drops the sender, and blocks on
219/// `JoinHandle::join()` until the worker thread has fully exited. Must block:
220/// `library.dynam.unload` unmaps the DLL's code pages as soon as this returns,
221/// and a still-live worker would resume execution in freed memory (see #277).
222///
223/// Idempotent. After the first call, the join handle is taken and subsequent
224/// calls are no-ops. Safe to call from any thread, though R only ever calls it
225/// from the main thread.
226///
227/// Additionally uninstalls the process panic hook that this DLL registered
228/// (also in DLL code — see `backtrace::miniextendr_panic_hook_uninstall`).
229///
230/// Without the `worker-thread` feature this is (mostly) a no-op: only the
231/// panic hook uninstall runs.
232#[doc(hidden)]
233#[unsafe(no_mangle)]
234pub extern "C-unwind" fn miniextendr_runtime_shutdown() {
235 #[cfg(feature = "worker-thread")]
236 {
237 worker_channel::shutdown();
238 }
239 crate::backtrace::miniextendr_panic_hook_uninstall();
240}
241// endregion
242
243// region: pub(crate) internals
244
245/// Check whether the current thread has a worker routing context.
246pub(crate) fn has_worker_context() -> bool {
247 #[cfg(feature = "worker-thread")]
248 {
249 worker_channel::has_context()
250 }
251 #[cfg(not(feature = "worker-thread"))]
252 {
253 false
254 }
255}
256
257/// Panic if the runtime hasn't been initialized.
258fn assert_runtime_initialized() {
259 if R_MAIN_THREAD_ID.get().is_none() {
260 panic!(
261 "miniextendr_runtime_init() must be called before using R FFI APIs.\n\
262 \n\
263 This is typically done in R_init_<pkgname>() via:\n\
264 \n\
265 void R_init_pkgname(DllInfo *dll) {{\n\
266 miniextendr_runtime_init();\n\
267 R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);\n\
268 }}\n\
269 \n\
270 If you're embedding R in Rust, call miniextendr_runtime_init() from the main thread \
271 before any R API calls."
272 );
273 }
274}
275// endregion
276
277// region: Worker channel infrastructure (only with worker-thread feature)
278
279#[cfg(feature = "worker-thread")]
280mod worker_channel {
281 use std::any::Any;
282 use std::cell::RefCell;
283 use std::panic::{AssertUnwindSafe, catch_unwind};
284 use std::sync::Mutex;
285 use std::sync::mpsc::{self, Receiver, SyncSender};
286 use std::thread;
287
288 use super::Sendable;
289 use crate::ffi::{self, Rboolean, SEXP};
290
291 type AnyJob = Box<dyn FnOnce() + Send>;
292
293 /// Tagged messages on the main→worker channel.
294 ///
295 /// Plain `Box<AnyJob>` transport with an atomic shutdown flag + `recv_timeout`
296 /// polling used to sit here. That shape left the worker asleep in
297 /// `recv_timeout` when `R_unload_<pkg>` fired, so `library.dynam.unload`
298 /// unmapped the DLL's code pages while the worker was still about to wake
299 /// up inside them — producing the "failed to initiate panic, error 5"
300 /// SEH corruption documented in #277.
301 ///
302 /// `Shutdown` is a proper message instead: the worker blocks on `recv()`
303 /// (no timeout, no polling), `shutdown()` delivers the message, and
304 /// `recv()` returns immediately. Combined with dropping the sender and
305 /// a blocking `JoinHandle::join()`, this makes `R_unload_<pkg>`
306 /// synchronous — the DLL can't unmap until the worker thread has truly
307 /// exited.
308 enum WorkerMsg {
309 Job(AnyJob),
310 Shutdown,
311 }
312
313 /// Single place that owns the worker's lifetime.
314 ///
315 /// `Mutex<Option<...>>` (rather than `OnceLock`) because `shutdown()`
316 /// needs to `.take()` both the sender (to drop it, closing the channel as
317 /// a second-path wake-up) and the join handle (to `.join()` it). After
318 /// `shutdown()`, `dispatch_to_worker` sees `None` and returns a
319 /// structured "worker shut down" error instead of relying on the old
320 /// `send` returning `SendError`.
321 struct WorkerState {
322 tx: SyncSender<WorkerMsg>,
323 handle: thread::JoinHandle<()>,
324 }
325
326 static WORKER: Mutex<Option<WorkerState>> = Mutex::new(None);
327
328 /// Shut the worker down synchronously.
329 ///
330 /// Send `Shutdown`, drop the sender (so `recv()` returns `Err` even if
331 /// the `Shutdown` message is somehow missed — defense in depth), then
332 /// block on `JoinHandle::join()`. No timeout: if the worker wedges, we
333 /// want the hang to surface directly rather than mask it with an
334 /// arbitrary deadline that then races DLL unmap. Idempotent — after the
335 /// first call, `WORKER` is `None` and the function is a no-op.
336 pub(super) fn shutdown() {
337 let Some(state) = WORKER.lock().unwrap().take() else {
338 return;
339 };
340 // If the worker already exited (e.g. it panicked), `send` errors —
341 // the drop below is what matters in that case.
342 let _ = state.tx.send(WorkerMsg::Shutdown);
343 drop(state.tx);
344 // We're being called from `R_unload_<pkg>`, which is `extern
345 // "C-unwind"` — so a panic here would unwind through R's dyn.unload
346 // handler. If the worker itself panicked in a way we can't catch,
347 // logging the payload and continuing is safer than re-raising
348 // across the R FFI boundary. We still join (unwrap the Err) so the
349 // handle is consumed and OS thread resources are released.
350 if let Err(payload) = state.handle.join() {
351 let msg = crate::unwind_protect::panic_payload_to_string(&*payload);
352 eprintln!("miniextendr: worker thread panicked during shutdown: {msg}");
353 }
354 }
355
356 // Type-erased main thread work: closure that returns boxed result
357 type MainThreadWork = Sendable<Box<dyn FnOnce() -> Box<dyn Any + Send> + 'static>>;
358
359 // Response from main thread: Ok(result) or Err(panic_message)
360 type MainThreadResponse = Result<Box<dyn Any + Send>, String>;
361
362 /// Messages from worker to main thread
363 enum WorkerMessage<T> {
364 /// Worker requests main thread to execute some work, then send response back
365 WorkRequest(MainThreadWork),
366 /// Worker is done, here's the final result
367 Done(Result<T, String>),
368 }
369
370 type TypeErasedWorkerMessage = WorkerMessage<Box<dyn Any + Send>>;
371 type WorkerToMainSender = RefCell<Option<SyncSender<TypeErasedWorkerMessage>>>;
372 type MainResponseReceiver = RefCell<Option<Receiver<MainThreadResponse>>>;
373
374 // Thread-local channels for worker -> main communication during run_on_worker
375 thread_local! {
376 static WORKER_TO_MAIN_TX: WorkerToMainSender = const { RefCell::new(None) };
377 static MAIN_RESPONSE_RX: MainResponseReceiver = const { RefCell::new(None) };
378 }
379
380 pub(super) fn has_context() -> bool {
381 WORKER_TO_MAIN_TX.with(|tx_cell| tx_cell.borrow().is_some())
382 }
383
384 /// Route a closure from the worker thread to the main thread.
385 pub(super) fn route_to_main_thread<F, R>(f: F) -> R
386 where
387 F: FnOnce() -> R + 'static,
388 R: Send + 'static,
389 {
390 WORKER_TO_MAIN_TX.with(|tx_cell| {
391 let tx = tx_cell
392 .borrow()
393 .as_ref()
394 .expect("`with_r_thread` called outside of `run_on_worker` context")
395 .clone();
396
397 let work: MainThreadWork =
398 Sendable(Box::new(move || Box::new(f()) as Box<dyn Any + Send>));
399
400 tx.send(WorkerMessage::WorkRequest(work))
401 .expect("main thread channel closed");
402 });
403
404 MAIN_RESPONSE_RX.with(|rx_cell| {
405 let rx = rx_cell.borrow();
406 let rx = rx.as_ref().expect("response channel not set");
407 let response = rx.recv().expect("main thread response channel closed");
408 match response {
409 Ok(boxed) => *boxed
410 .downcast::<R>()
411 .expect("type mismatch in `with_r_thread` response"),
412 Err(panic_msg) => panic!("panic in `with_r_thread`: {}", panic_msg),
413 }
414 })
415 }
416
417 /// Dispatch a closure to the worker thread.
418 /// Returns Ok(T) or Err(panic_message).
419 pub(super) fn dispatch_to_worker<F, T>(f: F) -> Result<T, String>
420 where
421 F: FnOnce() -> T + Send + 'static,
422 T: Send + 'static,
423 {
424 /// Marker type for R errors caught by R_UnwindProtect's cleanup handler.
425 struct RErrorMarker;
426
427 // Re-entry guard: if we're already on the worker thread (inside a
428 // run_on_worker job), a nested run_on_worker would deadlock because the
429 // single worker thread can't pick up a new job while running the current one.
430 if has_context() {
431 panic!(
432 "run_on_worker called re-entrantly from within a worker context.\n\
433 \n\
434 The single worker thread is already executing a job, so a nested \
435 run_on_worker would deadlock. To call R APIs from worker code, \
436 use with_r_thread() instead."
437 );
438 }
439
440 // Clone the worker's sender while holding the mutex briefly. The
441 // clone outlives the lock, so sends happen without blocking other
442 // callers on the mutex. If `WORKER` is `None`, the package has
443 // already been unloaded (or never initialized) — return a
444 // structured error instead of panicking.
445 let job_tx = {
446 let guard = WORKER.lock().unwrap();
447 match guard.as_ref() {
448 Some(state) => state.tx.clone(),
449 None => {
450 return Err(
451 "miniextendr worker is not running (runtime not initialized, \
452 or package has been unloaded)"
453 .to_string(),
454 );
455 }
456 }
457 };
458
459 // Single channel for worker -> main (work requests + final result).
460 // Capacity 1: each run_on_worker sends exactly one request at a time and blocks
461 // for a response, so no accumulation is possible. The extra slot ensures the
462 // worker's final Done message doesn't block if the main thread longjmped away.
463 let (worker_tx, worker_rx) = mpsc::sync_channel::<TypeErasedWorkerMessage>(1);
464
465 // Channel for main -> worker responses to work requests.
466 // Capacity 1: the worker blocks on recv after each with_r_thread call, so at most
467 // one response is in flight. The extra slot lets the cleanup handler send an error
468 // without blocking (it runs mid-longjmp and cannot wait).
469 let (response_tx, response_rx) = mpsc::sync_channel::<MainThreadResponse>(1);
470
471 let job: AnyJob = Box::new(move || {
472 // Set up thread-local channels for with_r_thread
473 WORKER_TO_MAIN_TX.with(|tx_cell| {
474 *tx_cell.borrow_mut() = Some(worker_tx.clone());
475 });
476 MAIN_RESPONSE_RX.with(|rx_cell| {
477 *rx_cell.borrow_mut() = Some(response_rx);
478 });
479
480 let result = catch_unwind(AssertUnwindSafe(f));
481
482 // Clear thread-locals
483 WORKER_TO_MAIN_TX.with(|tx_cell| {
484 *tx_cell.borrow_mut() = None;
485 });
486 MAIN_RESPONSE_RX.with(|rx_cell| {
487 *rx_cell.borrow_mut() = None;
488 });
489
490 // Send final result back to the main thread's recv loop. The capacity-1
491 // buffer ensures this doesn't block even if the main thread already exited
492 // the loop (e.g., after an R longjmp consumed the last WorkRequest).
493 let to_send: Result<Box<dyn Any + Send>, String> = match result {
494 Ok(val) => Ok(Box::new(val)),
495 Err(payload) => {
496 Err(crate::unwind_protect::panic_payload_to_string(&*payload).into_owned())
497 }
498 };
499 let _ = worker_tx.send(WorkerMessage::Done(to_send));
500 });
501
502 job_tx
503 .send(WorkerMsg::Job(job))
504 .expect("worker thread dead");
505
506 // Main thread event loop: processes WorkRequest messages (from with_r_thread)
507 // until a Done message arrives. Invariant: each WorkRequest produces exactly
508 // one response_tx.send, and the worker blocks until it receives that response.
509 loop {
510 match worker_rx
511 .recv()
512 .expect("worker channel closed unexpectedly")
513 {
514 WorkerMessage::WorkRequest(work) => {
515 // Execute work on main thread with R_UnwindProtect so we can:
516 // 1. Catch Rust panics and send them as errors to the worker
517 // 2. Catch R errors (longjmp) via cleanup handler and send error to worker
518 // before R continues unwinding (function never returns in that case)
519
520 struct CallData {
521 work: Option<MainThreadWork>,
522 result: Option<Box<dyn Any + Send>>,
523 panic_payload: Option<Box<dyn Any + Send>>,
524 response_tx_ptr: *const SyncSender<MainThreadResponse>,
525 }
526
527 unsafe extern "C-unwind" fn trampoline(data: *mut std::ffi::c_void) -> SEXP {
528 assert!(!data.is_null(), "trampoline: data pointer is null");
529 let data = unsafe { &mut *data.cast::<CallData>() };
530 let work = data
531 .work
532 .take()
533 .expect("trampoline: work already consumed")
534 .0;
535
536 match catch_unwind(AssertUnwindSafe(work)) {
537 Ok(result) => {
538 data.result = Some(result);
539 SEXP::nil()
540 }
541 Err(payload) => {
542 data.panic_payload = Some(payload);
543 SEXP::nil()
544 }
545 }
546 }
547
548 unsafe extern "C-unwind" fn cleanup_handler(
549 data: *mut std::ffi::c_void,
550 jump: Rboolean,
551 ) {
552 if jump != Rboolean::FALSE {
553 // R is about to longjmp. We MUST send an error response to the worker
554 // before continuing the unwind—the worker is blocked on response_rx.recv()
555 // and would deadlock if we don't send something.
556 assert!(!data.is_null(), "cleanup_handler: data pointer is null");
557 let data = unsafe { &*data.cast::<CallData>() };
558 let response_tx = unsafe { &*data.response_tx_ptr };
559
560 #[cfg(feature = "nonapi")]
561 let error_msg = unsafe {
562 let buf = ffi::R_curErrorBuf();
563 if buf.is_null() {
564 "R error occurred".to_string()
565 } else {
566 std::ffi::CStr::from_ptr(buf).to_string_lossy().into_owned()
567 }
568 };
569 #[cfg(not(feature = "nonapi"))]
570 let error_msg = "R error occurred".to_string();
571
572 let _ = response_tx.send(Err(error_msg));
573 std::panic::panic_any(RErrorMarker);
574 }
575 }
576
577 let response: MainThreadResponse = unsafe {
578 let token = crate::unwind_protect::get_continuation_token();
579
580 let data = Box::into_raw(Box::new(CallData {
581 work: Some(work),
582 result: None,
583 panic_payload: None,
584 response_tx_ptr: std::ptr::from_ref(&response_tx),
585 }));
586
587 let panic_result = catch_unwind(AssertUnwindSafe(|| {
588 ffi::R_UnwindProtect_C_unwind(
589 Some(trampoline),
590 data.cast(),
591 Some(cleanup_handler),
592 data.cast(),
593 token,
594 )
595 }));
596
597 let mut data = Box::from_raw(data);
598
599 match panic_result {
600 Ok(_) => {
601 // Check if trampoline caught a panic
602 if let Some(payload) = data.panic_payload.take() {
603 Err(crate::unwind_protect::panic_payload_to_string(&*payload)
604 .into_owned())
605 } else {
606 // Normal completion - return the result
607 Ok(data
608 .result
609 .take()
610 .expect("result not set after successful completion"))
611 }
612 }
613 Err(payload) => {
614 // Check if this was an R error (cleanup handler already sent response)
615 if payload.downcast_ref::<RErrorMarker>().is_some() {
616 drop(data);
617 ffi::R_ContinueUnwind(token);
618 }
619 // Rust panic - return as error response
620 Err(crate::unwind_protect::panic_payload_to_string(&*payload)
621 .into_owned())
622 }
623 }
624 };
625
626 // Exactly one send per WorkRequest: either here (normal/panic) or
627 // in cleanup_handler (R error). Never both—R error path diverges
628 // via R_ContinueUnwind above and never reaches this line.
629 response_tx
630 .send(response)
631 .expect("worker response channel closed");
632 }
633 WorkerMessage::Done(result) => {
634 return match result {
635 Ok(boxed) => Ok(*boxed
636 .downcast::<T>()
637 .expect("type mismatch in run_on_worker result")),
638 Err(msg) => Err(msg),
639 };
640 }
641 }
642 }
643 }
644
645 /// Spawn the worker thread and install it as the global `WORKER`.
646 ///
647 /// Idempotent — if the worker is already running, this is a no-op. We
648 /// intentionally do NOT call this from a `OnceLock`: after `shutdown()`
649 /// the slot is cleared, and a subsequent `dyn.load` on the same DLL
650 /// (same statics, unchanged addresses) should be able to spawn a fresh
651 /// worker. `std::sync::Once` would forbid that.
652 pub(super) fn init_worker() {
653 let mut guard = WORKER.lock().unwrap();
654 if guard.is_some() {
655 return;
656 }
657 // Capacity 0 (rendezvous): the main thread blocks until the worker picks
658 // up the job, ensuring at most one job is in flight at a time.
659 let (tx, rx) = mpsc::sync_channel::<WorkerMsg>(0);
660 let handle = thread::Builder::new()
661 .name("miniextendr-worker".into())
662 .spawn(move || worker_loop(rx))
663 .expect("failed to spawn worker thread");
664 *guard = Some(WorkerState { tx, handle });
665 }
666
667 /// Worker thread body: blocking `recv()` loop.
668 ///
669 /// `recv()` blocks the thread in the OS until either a message arrives
670 /// or the sender is dropped — no polling, no timeouts, no sleeps. On
671 /// `Job`, run it. On `Shutdown` or `Err` (sender dropped), exit.
672 fn worker_loop(rx: Receiver<WorkerMsg>) {
673 while let Ok(msg) = rx.recv() {
674 match msg {
675 WorkerMsg::Job(job) => job(),
676 WorkerMsg::Shutdown => break,
677 }
678 }
679 }
680}
681// endregion
682
683// region: Tests
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688
689 #[test]
690 fn sendable_is_send() {
691 fn assert_send<T: Send>() {}
692 assert_send::<Sendable<*const u8>>();
693 }
694
695 #[test]
696 fn with_r_thread_panics_before_init() {
697 // If another test already called miniextendr_runtime_init (via Once),
698 // we can't test the pre-init path. Verify at least panics from wrong thread.
699 if R_MAIN_THREAD_ID.get().is_some() {
700 let handle = std::thread::spawn(|| std::panic::catch_unwind(|| with_r_thread(|| 42)));
701 let result = handle.join().expect("thread panicked outside catch_unwind");
702 assert!(
703 result.is_err(),
704 "with_r_thread should panic from non-main thread"
705 );
706 return;
707 }
708 let result = std::panic::catch_unwind(|| {
709 with_r_thread(|| 42);
710 });
711 assert!(result.is_err());
712 let payload = result.unwrap_err();
713 let msg = crate::unwind_protect::panic_payload_to_string(payload.as_ref());
714 assert!(
715 msg.contains("miniextendr_runtime_init"),
716 "expected init error message, got: {msg}"
717 );
718 }
719
720 #[test]
721 fn has_worker_context_false_outside_worker() {
722 assert!(!has_worker_context());
723 }
724
725 // region: Feature-gated tests: worker-thread
726
727 #[cfg(feature = "worker-thread")]
728 mod worker_tests {
729 use super::*;
730
731 /// Calling `run_on_worker` from within worker code (re-entry) must be
732 /// detected and panic, not deadlock.
733 #[test]
734 fn run_on_worker_reentry_panics_not_deadlocks() {
735 miniextendr_runtime_init();
736
737 let (tx, rx) = std::sync::mpsc::sync_channel::<Result<String, String>>(1);
738
739 std::thread::spawn(move || {
740 let result = run_on_worker(|| {
741 // Re-entry: this is on the worker thread already.
742 run_on_worker(|| 42).unwrap();
743 });
744 match result {
745 Err(msg) => {
746 let _ = tx.send(Ok(msg));
747 }
748 Ok(()) => {
749 let _ = tx.send(Err("re-entry was not detected".into()));
750 }
751 }
752 });
753
754 match rx.recv_timeout(std::time::Duration::from_secs(5)) {
755 Ok(Ok(msg)) => {
756 assert!(
757 msg.contains("re-entr") || msg.contains("Re-entr"),
758 "expected re-entry error, got: {msg}"
759 );
760 }
761 Ok(Err(msg)) => panic!("{msg}"),
762 Err(_) => {
763 panic!("DEADLOCK: run_on_worker re-entry caused the test to hang for 5 seconds")
764 }
765 }
766 }
767 }
768 // endregion
769
770 // region: Feature-gated tests: no worker-thread (stubs)
771
772 #[cfg(not(feature = "worker-thread"))]
773 mod stub_tests {
774 use super::*;
775
776 #[test]
777 fn stub_with_r_thread_inline() {
778 miniextendr_runtime_init();
779 // If another parallel test already set R_MAIN_THREAD_ID to a
780 // different thread (OnceLock), we won't be "main" and with_r_thread
781 // will rightfully panic. Skip in that case.
782 if !is_r_main_thread() {
783 return;
784 }
785 let result = with_r_thread(|| 42);
786 assert_eq!(result, 42);
787 }
788
789 #[test]
790 fn stub_run_on_worker_inline() {
791 let result = run_on_worker(|| 123);
792 assert_eq!(result, Ok(123));
793 }
794
795 /// Without `worker-thread`, `with_r_thread` must panic when called from
796 /// a non-main thread.
797 #[test]
798 fn stub_with_r_thread_panics_on_wrong_thread() {
799 miniextendr_runtime_init();
800
801 let handle = std::thread::spawn(|| {
802 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| with_r_thread(|| 42)))
803 });
804
805 let result = handle.join().expect("thread panicked outside catch_unwind");
806 assert!(
807 result.is_err(),
808 "with_r_thread should panic when called from a non-main thread \
809 without the worker-thread feature, but it ran inline silently"
810 );
811 }
812 }
813 // endregion
814}
815// endregion