Skip to main content

miniextendr_api/
worker.rs

1//! Worker thread infrastructure for safe Rust-R FFI.
2//!
3//! ## Public API
4//!
5//! - [`with_r_thread`] — Execute a closure on R's main thread
6//! - [`is_r_main_thread`] — Check if the current thread is R's main thread
7//!
8//! ## Feature gate: `worker-thread`
9//!
10//! Without the `worker-thread` cargo feature, all calls execute inline on
11//! R's main thread:
12//! - `with_r_thread(f)` runs `f()` directly (panics if not on main thread)
13//! - `run_on_worker(f)` runs `f()` directly, returns `Ok(f())`
14//!
15//! With the feature enabled, a dedicated worker thread is spawned at init time.
16//! `with_r_thread` routes calls from the worker back to main, and `run_on_worker`
17//! dispatches closures to the worker with bidirectional communication.
18//!
19//! ## Initialization
20//!
21//! [`miniextendr_runtime_init`] must be called from R's main thread before any
22//! R FFI APIs. Typically done in `R_init_<pkgname>()`.
23
24use std::sync::OnceLock;
25use std::thread;
26
27use crate::ffi::{self, SEXP};
28
29static R_MAIN_THREAD_ID: OnceLock<thread::ThreadId> = OnceLock::new();
30
31// region: Public API
32
33/// Wrapper to mark values as Send for main-thread routing.
34///
35/// Only safe if the value is not accessed on the worker thread and is
36/// used exclusively on the main thread.
37#[doc(hidden)]
38#[repr(transparent)]
39#[derive(Clone, Copy)]
40pub struct Sendable<T>(pub T);
41
42unsafe impl<T> Send for Sendable<T> {}
43
44/// Check if the current thread is R's main thread.
45///
46/// Returns `true` if called from the main R thread, `false` otherwise.
47/// Before `miniextendr_runtime_init()` is called, always returns `false`.
48#[inline(always)]
49pub fn is_r_main_thread() -> bool {
50    R_MAIN_THREAD_ID
51        .get()
52        .map(|&id| id == std::thread::current().id())
53        .unwrap_or(false)
54}
55
56/// Execute a closure on R's main thread, returning the result.
57///
58/// This function can be called from any thread:
59/// - From the main thread: executes the closure directly (re-entrant)
60/// - From the worker thread (during `run_on_worker`): sends the work to
61///   the main thread and blocks until completion
62///
63/// # Panics
64///
65/// - If `miniextendr_runtime_init()` hasn't been called yet
66/// - If called from a non-main thread without the `worker-thread` feature
67/// - If called from a non-main thread outside of a `run_on_worker` context
68///   (even with the `worker-thread` feature)
69///
70/// # Example
71///
72/// ```ignore
73/// use miniextendr_api::with_r_thread;
74///
75/// // From worker thread, safely call R APIs:
76/// let sexp = with_r_thread(|| {
77///     // This runs on R's main thread
78///     SEXP::nil()
79/// });
80/// ```
81pub fn with_r_thread<F, R>(f: F) -> R
82where
83    F: FnOnce() -> R + 'static,
84    R: Send + 'static,
85{
86    assert_runtime_initialized();
87
88    if is_r_main_thread() {
89        return f();
90    }
91
92    // Not on main thread — need worker-thread feature for routing
93    #[cfg(not(feature = "worker-thread"))]
94    {
95        panic!(
96            "with_r_thread called from a non-main thread without the `worker-thread` feature.\n\
97             \n\
98             Without `worker-thread`, R API calls can only happen on the R main thread.\n\
99             Either:\n\
100             - Enable the `worker-thread` cargo feature to route calls from background threads, or\n\
101             - Ensure this code only runs on the R main thread."
102        );
103    }
104
105    #[cfg(feature = "worker-thread")]
106    {
107        worker_channel::route_to_main_thread(f)
108    }
109}
110// endregion
111
112// region: #[doc(hidden)] items for macro-generated code
113
114/// Raise an R error from a panic message. Does not return.
115///
116/// If `call` is `Some(sexp)`, uses `Rf_errorcall` to include call context.
117#[doc(hidden)]
118pub fn panic_message_to_r_error(msg: String, call: Option<SEXP>) -> ! {
119    let c_msg = std::ffi::CString::new(msg)
120        .unwrap_or_else(|_| std::ffi::CString::new("Rust panic (invalid message)").unwrap());
121    unsafe {
122        match call {
123            Some(call) => ffi::Rf_errorcall_unchecked(call, c"%s".as_ptr(), c_msg.as_ptr()),
124            None => ffi::Rf_error_unchecked(c"%s".as_ptr(), c_msg.as_ptr()),
125        }
126    }
127}
128
129/// Run a closure on the worker thread with proper cleanup on panic.
130///
131/// Returns `Ok(T)` on success, `Err(String)` if the closure panicked.
132/// The caller handles the error (either tagged error value or `Rf_errorcall`).
133///
134/// Without the `worker-thread` feature, runs inline on the current thread.
135#[doc(hidden)]
136pub fn run_on_worker<F, T>(f: F) -> Result<T, String>
137where
138    F: FnOnce() -> T + Send + 'static,
139    T: Send + 'static,
140{
141    #[cfg(not(feature = "worker-thread"))]
142    {
143        Ok(f())
144    }
145
146    #[cfg(feature = "worker-thread")]
147    {
148        let result = worker_channel::dispatch_to_worker(f);
149        if let Err(ref msg) = result {
150            crate::panic_telemetry::fire(msg, crate::panic_telemetry::PanicSource::Worker);
151        }
152        result
153    }
154}
155
156/// Initialize the miniextendr runtime.
157///
158/// Records the main thread ID and (with `worker-thread`) spawns the worker.
159/// Must be called from R's main thread, typically from `R_init_<pkgname>`.
160#[doc(hidden)]
161#[unsafe(no_mangle)]
162pub extern "C-unwind" fn miniextendr_runtime_init() {
163    static RUN_ONCE: std::sync::Once = std::sync::Once::new();
164
165    #[cfg(feature = "worker-thread")]
166    {
167        RUN_ONCE.call_once_force(|x| {
168            if x.is_poisoned() {
169                eprintln!(
170                    "warning: miniextendr worker init is retrying after a previous failed attempt"
171                );
172            }
173
174            let current_id = std::thread::current().id();
175            if let Some(&existing_id) = R_MAIN_THREAD_ID.get() {
176                if existing_id != current_id {
177                    panic!(
178                        "miniextendr_runtime_init called from thread {:?}, but R_MAIN_THREAD_ID \
179                         was already set to {:?}. This indicates incorrect initialization order.",
180                        current_id, existing_id
181                    );
182                }
183            } else {
184                let _ = R_MAIN_THREAD_ID.set(current_id);
185            }
186
187            worker_channel::init_worker();
188
189            // NB: no libc `atexit` registration.
190            //
191            // `atexit` stores a function pointer into the DLL's code. If the
192            // package is unloaded (e.g. via `library.dynam.unload` / dyn.unload
193            // / devtools::load_all(reset = TRUE)) before libc runs its atexit
194            // registry at process exit, the handler jumps to an unmapped
195            // address and tears down the process's SEH state on Windows —
196            // which manifests as "fatal runtime error: failed to initiate
197            // panic, error 5" in the next DLL that tries to unwind. #277.
198            //
199            // The normal path (package unload → `R_unload_<pkg>` →
200            // `miniextendr_runtime_shutdown`) already joins the worker
201            // cleanly. The abnormal path (process exit without unload, e.g.
202            // `q("no")`) relies on the OS to reap the worker thread, which
203            // it does — we don't need graceful shutdown for a dying process.
204        });
205    }
206
207    #[cfg(not(feature = "worker-thread"))]
208    {
209        RUN_ONCE.call_once(|| {
210            let _ = R_MAIN_THREAD_ID.set(std::thread::current().id());
211        });
212    }
213}
214
215/// Shut down the miniextendr worker thread synchronously.
216///
217/// Called from `R_unload_<pkg>` (generated by `miniextendr_init!`). Sends a
218/// `Shutdown` message to the worker, drops the sender, and blocks on
219/// `JoinHandle::join()` until the worker thread has fully exited. Must block:
220/// `library.dynam.unload` unmaps the DLL's code pages as soon as this returns,
221/// and a still-live worker would resume execution in freed memory (see #277).
222///
223/// Idempotent. After the first call, the join handle is taken and subsequent
224/// calls are no-ops. Safe to call from any thread, though R only ever calls it
225/// from the main thread.
226///
227/// Additionally uninstalls the process panic hook that this DLL registered
228/// (also in DLL code — see `backtrace::miniextendr_panic_hook_uninstall`).
229///
230/// Without the `worker-thread` feature this is (mostly) a no-op: only the
231/// panic hook uninstall runs.
232#[doc(hidden)]
233#[unsafe(no_mangle)]
234pub extern "C-unwind" fn miniextendr_runtime_shutdown() {
235    #[cfg(feature = "worker-thread")]
236    {
237        worker_channel::shutdown();
238    }
239    crate::backtrace::miniextendr_panic_hook_uninstall();
240}
241// endregion
242
243// region: pub(crate) internals
244
245/// Check whether the current thread has a worker routing context.
246pub(crate) fn has_worker_context() -> bool {
247    #[cfg(feature = "worker-thread")]
248    {
249        worker_channel::has_context()
250    }
251    #[cfg(not(feature = "worker-thread"))]
252    {
253        false
254    }
255}
256
257/// Panic if the runtime hasn't been initialized.
258fn assert_runtime_initialized() {
259    if R_MAIN_THREAD_ID.get().is_none() {
260        panic!(
261            "miniextendr_runtime_init() must be called before using R FFI APIs.\n\
262             \n\
263             This is typically done in R_init_<pkgname>() via:\n\
264             \n\
265             void R_init_pkgname(DllInfo *dll) {{\n\
266             miniextendr_runtime_init();\n\
267             R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);\n\
268             }}\n\
269             \n\
270             If you're embedding R in Rust, call miniextendr_runtime_init() from the main thread \
271             before any R API calls."
272        );
273    }
274}
275// endregion
276
277// region: Worker channel infrastructure (only with worker-thread feature)
278
279#[cfg(feature = "worker-thread")]
280mod worker_channel {
281    use std::any::Any;
282    use std::cell::RefCell;
283    use std::panic::{AssertUnwindSafe, catch_unwind};
284    use std::sync::Mutex;
285    use std::sync::mpsc::{self, Receiver, SyncSender};
286    use std::thread;
287
288    use super::Sendable;
289    use crate::ffi::{self, Rboolean, SEXP};
290
291    type AnyJob = Box<dyn FnOnce() + Send>;
292
293    /// Tagged messages on the main→worker channel.
294    ///
295    /// Plain `Box<AnyJob>` transport with an atomic shutdown flag + `recv_timeout`
296    /// polling used to sit here. That shape left the worker asleep in
297    /// `recv_timeout` when `R_unload_<pkg>` fired, so `library.dynam.unload`
298    /// unmapped the DLL's code pages while the worker was still about to wake
299    /// up inside them — producing the "failed to initiate panic, error 5"
300    /// SEH corruption documented in #277.
301    ///
302    /// `Shutdown` is a proper message instead: the worker blocks on `recv()`
303    /// (no timeout, no polling), `shutdown()` delivers the message, and
304    /// `recv()` returns immediately. Combined with dropping the sender and
305    /// a blocking `JoinHandle::join()`, this makes `R_unload_<pkg>`
306    /// synchronous — the DLL can't unmap until the worker thread has truly
307    /// exited.
308    enum WorkerMsg {
309        Job(AnyJob),
310        Shutdown,
311    }
312
313    /// Single place that owns the worker's lifetime.
314    ///
315    /// `Mutex<Option<...>>` (rather than `OnceLock`) because `shutdown()`
316    /// needs to `.take()` both the sender (to drop it, closing the channel as
317    /// a second-path wake-up) and the join handle (to `.join()` it). After
318    /// `shutdown()`, `dispatch_to_worker` sees `None` and returns a
319    /// structured "worker shut down" error instead of relying on the old
320    /// `send` returning `SendError`.
321    struct WorkerState {
322        tx: SyncSender<WorkerMsg>,
323        handle: thread::JoinHandle<()>,
324    }
325
326    static WORKER: Mutex<Option<WorkerState>> = Mutex::new(None);
327
328    /// Shut the worker down synchronously.
329    ///
330    /// Send `Shutdown`, drop the sender (so `recv()` returns `Err` even if
331    /// the `Shutdown` message is somehow missed — defense in depth), then
332    /// block on `JoinHandle::join()`. No timeout: if the worker wedges, we
333    /// want the hang to surface directly rather than mask it with an
334    /// arbitrary deadline that then races DLL unmap. Idempotent — after the
335    /// first call, `WORKER` is `None` and the function is a no-op.
336    pub(super) fn shutdown() {
337        let Some(state) = WORKER.lock().unwrap().take() else {
338            return;
339        };
340        // If the worker already exited (e.g. it panicked), `send` errors —
341        // the drop below is what matters in that case.
342        let _ = state.tx.send(WorkerMsg::Shutdown);
343        drop(state.tx);
344        // We're being called from `R_unload_<pkg>`, which is `extern
345        // "C-unwind"` — so a panic here would unwind through R's dyn.unload
346        // handler. If the worker itself panicked in a way we can't catch,
347        // logging the payload and continuing is safer than re-raising
348        // across the R FFI boundary. We still join (unwrap the Err) so the
349        // handle is consumed and OS thread resources are released.
350        if let Err(payload) = state.handle.join() {
351            let msg = crate::unwind_protect::panic_payload_to_string(&*payload);
352            eprintln!("miniextendr: worker thread panicked during shutdown: {msg}");
353        }
354    }
355
356    // Type-erased main thread work: closure that returns boxed result
357    type MainThreadWork = Sendable<Box<dyn FnOnce() -> Box<dyn Any + Send> + 'static>>;
358
359    // Response from main thread: Ok(result) or Err(panic_message)
360    type MainThreadResponse = Result<Box<dyn Any + Send>, String>;
361
362    /// Messages from worker to main thread
363    enum WorkerMessage<T> {
364        /// Worker requests main thread to execute some work, then send response back
365        WorkRequest(MainThreadWork),
366        /// Worker is done, here's the final result
367        Done(Result<T, String>),
368    }
369
370    type TypeErasedWorkerMessage = WorkerMessage<Box<dyn Any + Send>>;
371    type WorkerToMainSender = RefCell<Option<SyncSender<TypeErasedWorkerMessage>>>;
372    type MainResponseReceiver = RefCell<Option<Receiver<MainThreadResponse>>>;
373
374    // Thread-local channels for worker -> main communication during run_on_worker
375    thread_local! {
376        static WORKER_TO_MAIN_TX: WorkerToMainSender = const { RefCell::new(None) };
377        static MAIN_RESPONSE_RX: MainResponseReceiver = const { RefCell::new(None) };
378    }
379
380    pub(super) fn has_context() -> bool {
381        WORKER_TO_MAIN_TX.with(|tx_cell| tx_cell.borrow().is_some())
382    }
383
384    /// Route a closure from the worker thread to the main thread.
385    pub(super) fn route_to_main_thread<F, R>(f: F) -> R
386    where
387        F: FnOnce() -> R + 'static,
388        R: Send + 'static,
389    {
390        WORKER_TO_MAIN_TX.with(|tx_cell| {
391            let tx = tx_cell
392                .borrow()
393                .as_ref()
394                .expect("`with_r_thread` called outside of `run_on_worker` context")
395                .clone();
396
397            let work: MainThreadWork =
398                Sendable(Box::new(move || Box::new(f()) as Box<dyn Any + Send>));
399
400            tx.send(WorkerMessage::WorkRequest(work))
401                .expect("main thread channel closed");
402        });
403
404        MAIN_RESPONSE_RX.with(|rx_cell| {
405            let rx = rx_cell.borrow();
406            let rx = rx.as_ref().expect("response channel not set");
407            let response = rx.recv().expect("main thread response channel closed");
408            match response {
409                Ok(boxed) => *boxed
410                    .downcast::<R>()
411                    .expect("type mismatch in `with_r_thread` response"),
412                Err(panic_msg) => panic!("panic in `with_r_thread`: {}", panic_msg),
413            }
414        })
415    }
416
417    /// Dispatch a closure to the worker thread.
418    /// Returns Ok(T) or Err(panic_message).
419    pub(super) fn dispatch_to_worker<F, T>(f: F) -> Result<T, String>
420    where
421        F: FnOnce() -> T + Send + 'static,
422        T: Send + 'static,
423    {
424        /// Marker type for R errors caught by R_UnwindProtect's cleanup handler.
425        struct RErrorMarker;
426
427        // Re-entry guard: if we're already on the worker thread (inside a
428        // run_on_worker job), a nested run_on_worker would deadlock because the
429        // single worker thread can't pick up a new job while running the current one.
430        if has_context() {
431            panic!(
432                "run_on_worker called re-entrantly from within a worker context.\n\
433                 \n\
434                 The single worker thread is already executing a job, so a nested \
435                 run_on_worker would deadlock. To call R APIs from worker code, \
436                 use with_r_thread() instead."
437            );
438        }
439
440        // Clone the worker's sender while holding the mutex briefly. The
441        // clone outlives the lock, so sends happen without blocking other
442        // callers on the mutex. If `WORKER` is `None`, the package has
443        // already been unloaded (or never initialized) — return a
444        // structured error instead of panicking.
445        let job_tx = {
446            let guard = WORKER.lock().unwrap();
447            match guard.as_ref() {
448                Some(state) => state.tx.clone(),
449                None => {
450                    return Err(
451                        "miniextendr worker is not running (runtime not initialized, \
452                         or package has been unloaded)"
453                            .to_string(),
454                    );
455                }
456            }
457        };
458
459        // Single channel for worker -> main (work requests + final result).
460        // Capacity 1: each run_on_worker sends exactly one request at a time and blocks
461        // for a response, so no accumulation is possible. The extra slot ensures the
462        // worker's final Done message doesn't block if the main thread longjmped away.
463        let (worker_tx, worker_rx) = mpsc::sync_channel::<TypeErasedWorkerMessage>(1);
464
465        // Channel for main -> worker responses to work requests.
466        // Capacity 1: the worker blocks on recv after each with_r_thread call, so at most
467        // one response is in flight. The extra slot lets the cleanup handler send an error
468        // without blocking (it runs mid-longjmp and cannot wait).
469        let (response_tx, response_rx) = mpsc::sync_channel::<MainThreadResponse>(1);
470
471        let job: AnyJob = Box::new(move || {
472            // Set up thread-local channels for with_r_thread
473            WORKER_TO_MAIN_TX.with(|tx_cell| {
474                *tx_cell.borrow_mut() = Some(worker_tx.clone());
475            });
476            MAIN_RESPONSE_RX.with(|rx_cell| {
477                *rx_cell.borrow_mut() = Some(response_rx);
478            });
479
480            let result = catch_unwind(AssertUnwindSafe(f));
481
482            // Clear thread-locals
483            WORKER_TO_MAIN_TX.with(|tx_cell| {
484                *tx_cell.borrow_mut() = None;
485            });
486            MAIN_RESPONSE_RX.with(|rx_cell| {
487                *rx_cell.borrow_mut() = None;
488            });
489
490            // Send final result back to the main thread's recv loop. The capacity-1
491            // buffer ensures this doesn't block even if the main thread already exited
492            // the loop (e.g., after an R longjmp consumed the last WorkRequest).
493            let to_send: Result<Box<dyn Any + Send>, String> = match result {
494                Ok(val) => Ok(Box::new(val)),
495                Err(payload) => {
496                    Err(crate::unwind_protect::panic_payload_to_string(&*payload).into_owned())
497                }
498            };
499            let _ = worker_tx.send(WorkerMessage::Done(to_send));
500        });
501
502        job_tx
503            .send(WorkerMsg::Job(job))
504            .expect("worker thread dead");
505
506        // Main thread event loop: processes WorkRequest messages (from with_r_thread)
507        // until a Done message arrives. Invariant: each WorkRequest produces exactly
508        // one response_tx.send, and the worker blocks until it receives that response.
509        loop {
510            match worker_rx
511                .recv()
512                .expect("worker channel closed unexpectedly")
513            {
514                WorkerMessage::WorkRequest(work) => {
515                    // Execute work on main thread with R_UnwindProtect so we can:
516                    // 1. Catch Rust panics and send them as errors to the worker
517                    // 2. Catch R errors (longjmp) via cleanup handler and send error to worker
518                    //    before R continues unwinding (function never returns in that case)
519
520                    struct CallData {
521                        work: Option<MainThreadWork>,
522                        result: Option<Box<dyn Any + Send>>,
523                        panic_payload: Option<Box<dyn Any + Send>>,
524                        response_tx_ptr: *const SyncSender<MainThreadResponse>,
525                    }
526
527                    unsafe extern "C-unwind" fn trampoline(data: *mut std::ffi::c_void) -> SEXP {
528                        assert!(!data.is_null(), "trampoline: data pointer is null");
529                        let data = unsafe { &mut *data.cast::<CallData>() };
530                        let work = data
531                            .work
532                            .take()
533                            .expect("trampoline: work already consumed")
534                            .0;
535
536                        match catch_unwind(AssertUnwindSafe(work)) {
537                            Ok(result) => {
538                                data.result = Some(result);
539                                SEXP::nil()
540                            }
541                            Err(payload) => {
542                                data.panic_payload = Some(payload);
543                                SEXP::nil()
544                            }
545                        }
546                    }
547
548                    unsafe extern "C-unwind" fn cleanup_handler(
549                        data: *mut std::ffi::c_void,
550                        jump: Rboolean,
551                    ) {
552                        if jump != Rboolean::FALSE {
553                            // R is about to longjmp. We MUST send an error response to the worker
554                            // before continuing the unwind—the worker is blocked on response_rx.recv()
555                            // and would deadlock if we don't send something.
556                            assert!(!data.is_null(), "cleanup_handler: data pointer is null");
557                            let data = unsafe { &*data.cast::<CallData>() };
558                            let response_tx = unsafe { &*data.response_tx_ptr };
559
560                            #[cfg(feature = "nonapi")]
561                            let error_msg = unsafe {
562                                let buf = ffi::R_curErrorBuf();
563                                if buf.is_null() {
564                                    "R error occurred".to_string()
565                                } else {
566                                    std::ffi::CStr::from_ptr(buf).to_string_lossy().into_owned()
567                                }
568                            };
569                            #[cfg(not(feature = "nonapi"))]
570                            let error_msg = "R error occurred".to_string();
571
572                            let _ = response_tx.send(Err(error_msg));
573                            std::panic::panic_any(RErrorMarker);
574                        }
575                    }
576
577                    let response: MainThreadResponse = unsafe {
578                        let token = crate::unwind_protect::get_continuation_token();
579
580                        let data = Box::into_raw(Box::new(CallData {
581                            work: Some(work),
582                            result: None,
583                            panic_payload: None,
584                            response_tx_ptr: std::ptr::from_ref(&response_tx),
585                        }));
586
587                        let panic_result = catch_unwind(AssertUnwindSafe(|| {
588                            ffi::R_UnwindProtect_C_unwind(
589                                Some(trampoline),
590                                data.cast(),
591                                Some(cleanup_handler),
592                                data.cast(),
593                                token,
594                            )
595                        }));
596
597                        let mut data = Box::from_raw(data);
598
599                        match panic_result {
600                            Ok(_) => {
601                                // Check if trampoline caught a panic
602                                if let Some(payload) = data.panic_payload.take() {
603                                    Err(crate::unwind_protect::panic_payload_to_string(&*payload)
604                                        .into_owned())
605                                } else {
606                                    // Normal completion - return the result
607                                    Ok(data
608                                        .result
609                                        .take()
610                                        .expect("result not set after successful completion"))
611                                }
612                            }
613                            Err(payload) => {
614                                // Check if this was an R error (cleanup handler already sent response)
615                                if payload.downcast_ref::<RErrorMarker>().is_some() {
616                                    drop(data);
617                                    ffi::R_ContinueUnwind(token);
618                                }
619                                // Rust panic - return as error response
620                                Err(crate::unwind_protect::panic_payload_to_string(&*payload)
621                                    .into_owned())
622                            }
623                        }
624                    };
625
626                    // Exactly one send per WorkRequest: either here (normal/panic) or
627                    // in cleanup_handler (R error). Never both—R error path diverges
628                    // via R_ContinueUnwind above and never reaches this line.
629                    response_tx
630                        .send(response)
631                        .expect("worker response channel closed");
632                }
633                WorkerMessage::Done(result) => {
634                    return match result {
635                        Ok(boxed) => Ok(*boxed
636                            .downcast::<T>()
637                            .expect("type mismatch in run_on_worker result")),
638                        Err(msg) => Err(msg),
639                    };
640                }
641            }
642        }
643    }
644
645    /// Spawn the worker thread and install it as the global `WORKER`.
646    ///
647    /// Idempotent — if the worker is already running, this is a no-op. We
648    /// intentionally do NOT call this from a `OnceLock`: after `shutdown()`
649    /// the slot is cleared, and a subsequent `dyn.load` on the same DLL
650    /// (same statics, unchanged addresses) should be able to spawn a fresh
651    /// worker. `std::sync::Once` would forbid that.
652    pub(super) fn init_worker() {
653        let mut guard = WORKER.lock().unwrap();
654        if guard.is_some() {
655            return;
656        }
657        // Capacity 0 (rendezvous): the main thread blocks until the worker picks
658        // up the job, ensuring at most one job is in flight at a time.
659        let (tx, rx) = mpsc::sync_channel::<WorkerMsg>(0);
660        let handle = thread::Builder::new()
661            .name("miniextendr-worker".into())
662            .spawn(move || worker_loop(rx))
663            .expect("failed to spawn worker thread");
664        *guard = Some(WorkerState { tx, handle });
665    }
666
667    /// Worker thread body: blocking `recv()` loop.
668    ///
669    /// `recv()` blocks the thread in the OS until either a message arrives
670    /// or the sender is dropped — no polling, no timeouts, no sleeps. On
671    /// `Job`, run it. On `Shutdown` or `Err` (sender dropped), exit.
672    fn worker_loop(rx: Receiver<WorkerMsg>) {
673        while let Ok(msg) = rx.recv() {
674            match msg {
675                WorkerMsg::Job(job) => job(),
676                WorkerMsg::Shutdown => break,
677            }
678        }
679    }
680}
681// endregion
682
683// region: Tests
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688
689    #[test]
690    fn sendable_is_send() {
691        fn assert_send<T: Send>() {}
692        assert_send::<Sendable<*const u8>>();
693    }
694
695    #[test]
696    fn with_r_thread_panics_before_init() {
697        // If another test already called miniextendr_runtime_init (via Once),
698        // we can't test the pre-init path. Verify at least panics from wrong thread.
699        if R_MAIN_THREAD_ID.get().is_some() {
700            let handle = std::thread::spawn(|| std::panic::catch_unwind(|| with_r_thread(|| 42)));
701            let result = handle.join().expect("thread panicked outside catch_unwind");
702            assert!(
703                result.is_err(),
704                "with_r_thread should panic from non-main thread"
705            );
706            return;
707        }
708        let result = std::panic::catch_unwind(|| {
709            with_r_thread(|| 42);
710        });
711        assert!(result.is_err());
712        let payload = result.unwrap_err();
713        let msg = crate::unwind_protect::panic_payload_to_string(payload.as_ref());
714        assert!(
715            msg.contains("miniextendr_runtime_init"),
716            "expected init error message, got: {msg}"
717        );
718    }
719
720    #[test]
721    fn has_worker_context_false_outside_worker() {
722        assert!(!has_worker_context());
723    }
724
725    // region: Feature-gated tests: worker-thread
726
727    #[cfg(feature = "worker-thread")]
728    mod worker_tests {
729        use super::*;
730
731        /// Calling `run_on_worker` from within worker code (re-entry) must be
732        /// detected and panic, not deadlock.
733        #[test]
734        fn run_on_worker_reentry_panics_not_deadlocks() {
735            miniextendr_runtime_init();
736
737            let (tx, rx) = std::sync::mpsc::sync_channel::<Result<String, String>>(1);
738
739            std::thread::spawn(move || {
740                let result = run_on_worker(|| {
741                    // Re-entry: this is on the worker thread already.
742                    run_on_worker(|| 42).unwrap();
743                });
744                match result {
745                    Err(msg) => {
746                        let _ = tx.send(Ok(msg));
747                    }
748                    Ok(()) => {
749                        let _ = tx.send(Err("re-entry was not detected".into()));
750                    }
751                }
752            });
753
754            match rx.recv_timeout(std::time::Duration::from_secs(5)) {
755                Ok(Ok(msg)) => {
756                    assert!(
757                        msg.contains("re-entr") || msg.contains("Re-entr"),
758                        "expected re-entry error, got: {msg}"
759                    );
760                }
761                Ok(Err(msg)) => panic!("{msg}"),
762                Err(_) => {
763                    panic!("DEADLOCK: run_on_worker re-entry caused the test to hang for 5 seconds")
764                }
765            }
766        }
767    }
768    // endregion
769
770    // region: Feature-gated tests: no worker-thread (stubs)
771
772    #[cfg(not(feature = "worker-thread"))]
773    mod stub_tests {
774        use super::*;
775
776        #[test]
777        fn stub_with_r_thread_inline() {
778            miniextendr_runtime_init();
779            // If another parallel test already set R_MAIN_THREAD_ID to a
780            // different thread (OnceLock), we won't be "main" and with_r_thread
781            // will rightfully panic. Skip in that case.
782            if !is_r_main_thread() {
783                return;
784            }
785            let result = with_r_thread(|| 42);
786            assert_eq!(result, 42);
787        }
788
789        #[test]
790        fn stub_run_on_worker_inline() {
791            let result = run_on_worker(|| 123);
792            assert_eq!(result, Ok(123));
793        }
794
795        /// Without `worker-thread`, `with_r_thread` must panic when called from
796        /// a non-main thread.
797        #[test]
798        fn stub_with_r_thread_panics_on_wrong_thread() {
799            miniextendr_runtime_init();
800
801            let handle = std::thread::spawn(|| {
802                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| with_r_thread(|| 42)))
803            });
804
805            let result = handle.join().expect("thread panicked outside catch_unwind");
806            assert!(
807                result.is_err(),
808                "with_r_thread should panic when called from a non-main thread \
809                 without the worker-thread feature, but it ran inline silently"
810            );
811        }
812    }
813    // endregion
814}
815// endregion