Skip to main content
← dvs documentation Rust API reference

dvs/
utils.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use anyhow::{Result, bail};
4
5const DEFAULT_THREADS_PER_CPU: usize = 4;
6/// Maximum thread count threshold when `DVS_NUM_THREADS` is unset
7const DEFAULT_MAX_THREADS: usize = 16;
8/// Maximum thread count threshold if `DVS_NUM_THREADS` is set
9const ENV_MAX_THREADS: usize = 32;
10
11/// Global thread count override. 0 = unset (use env var or default).
12static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
13
14/// Set the number of threads for DVS parallel operations.
15/// Pass 0 to clear (revert to env var / automatic detection).
16pub fn set_num_threads(n: usize) {
17    NUM_THREADS.store(n, Ordering::Relaxed);
18}
19
20/// Returns the configured thread count, or `None` if unset.
21pub(crate) fn get_num_threads() -> Option<usize> {
22    match NUM_THREADS.load(Ordering::Relaxed) {
23        0 => None,
24        n => Some(n),
25    }
26}
27
28const KB: u64 = 1_024;
29const MB: u64 = 1_024 * 1_024;
30const GB: u64 = 1_024 * 1_024 * 1_024;
31const TB: u64 = 1_024 * 1_024 * 1_024 * 1_024;
32
33/// Formats a byte count into a human-readable string (e.g. "10.5 MB").
34/// Uses base-1024 divisors to match `ls -h` / `du -h` output.
35pub fn format_size(bytes: u64) -> String {
36    if bytes >= TB {
37        format!("{:.1} TB", bytes as f64 / TB as f64)
38    } else if bytes >= GB {
39        format!("{:.1} GB", bytes as f64 / GB as f64)
40    } else if bytes >= MB {
41        format!("{:.1} MB", bytes as f64 / MB as f64)
42    } else if bytes >= KB {
43        format!("{:.1} KB", bytes as f64 / KB as f64)
44    } else {
45        format!("{bytes} B")
46    }
47}
48
49/// Takes a human formatted byte size and convert it to the number of bytes
50pub fn parse_size(size: &str) -> Result<u64> {
51    let s = size.trim();
52
53    if s == "0" {
54        return Ok(0);
55    }
56
57    let num_end = s
58        .find(|c: char| !c.is_ascii_digit() && c != '.')
59        .unwrap_or(s.len());
60    let (num_str, unit_str) = s.split_at(num_end);
61    let num_str = num_str.trim();
62    let unit_str = unit_str.trim().to_ascii_uppercase();
63
64    if num_str.is_empty() {
65        bail!("Invalid size string: {s:?} (no number found)");
66    }
67
68    let num: f64 = num_str
69        .parse()
70        .map_err(|_| anyhow::anyhow!("Invalid number in size string: {num_str:?}"))?;
71
72    let multiplier = match unit_str.as_str() {
73        "" | "B" => 1u64,
74        "K" | "KB" | "KIB" => KB,
75        "M" | "MB" | "MIB" => MB,
76        "G" | "GB" | "GIB" => GB,
77        "T" | "TB" | "TIB" => TB,
78        _ => bail!("Unknown size unit: {unit_str:?}"),
79    };
80
81    Ok((num * multiplier as f64) as u64)
82}
83
84/// Creates a rayon thread pool.
85///
86/// Thread count priority (highest to lowest):
87/// 1. Global override via [`set_num_threads`] (capped at 32)
88/// 2. `DVS_NUM_THREADS` environment variable (capped at 32)
89/// 3. Default: `available_cpus * 4` (capped at 16)
90///
91/// The result is always clamped to the number of work items.
92pub fn get_threadpool(work_items: usize) -> Result<rayon::ThreadPool> {
93    debug_assert_ne!(
94        work_items, 0,
95        "the thread pool should not be instantiated when there are no work items to process"
96    );
97    // a proxy for available logical cpu cores
98    let available = std::thread::available_parallelism()
99        .map(|n| n.get())
100        .unwrap_or(1)
101        .max(1);
102    let env_threads = std::env::var("DVS_NUM_THREADS")
103        .ok()
104        .and_then(|v| v.parse::<usize>().ok())
105        .filter(|&n| n > 0);
106    // Priority: set_num_threads() > DVS_NUM_THREADS env var > default
107    let num_threads = {
108        let work_limit = work_items.max(1);
109
110        let configured = match get_num_threads().or(env_threads) {
111            Some(n) => n.min(ENV_MAX_THREADS),
112            None => available
113                .saturating_mul(DEFAULT_THREADS_PER_CPU)
114                .min(DEFAULT_MAX_THREADS),
115        };
116        configured.min(work_limit)
117    };
118
119    let pool = rayon::ThreadPoolBuilder::new()
120        .num_threads(num_threads)
121        .build()?;
122    Ok(pool)
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    #[test]
129    fn parse_size_basic() {
130        assert_eq!(parse_size("0").unwrap(), 0);
131        assert_eq!(parse_size("500MB").unwrap(), 500 * MB);
132        assert_eq!(parse_size("500 MB").unwrap(), 500 * MB);
133        assert_eq!(parse_size("1GB").unwrap(), GB);
134        assert_eq!(parse_size("1 gb").unwrap(), GB);
135        assert_eq!(parse_size("2TB").unwrap(), 2 * TB);
136        assert_eq!(parse_size("100KB").unwrap(), 100 * KB);
137        assert_eq!(parse_size("1024B").unwrap(), 1024);
138        assert_eq!(parse_size("1.5GB").unwrap(), (1.5 * GB as f64) as u64);
139        assert!(parse_size("").is_err());
140        assert!(parse_size("MB").is_err());
141        assert!(parse_size("500XB").is_err());
142    }
143}