1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use anyhow::{Result, bail};
4
5const DEFAULT_THREADS_PER_CPU: usize = 4;
6const DEFAULT_MAX_THREADS: usize = 16;
8const ENV_MAX_THREADS: usize = 32;
10
11static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
13
14pub fn set_num_threads(n: usize) {
17 NUM_THREADS.store(n, Ordering::Relaxed);
18}
19
20pub(crate) fn get_num_threads() -> Option<usize> {
22 match NUM_THREADS.load(Ordering::Relaxed) {
23 0 => None,
24 n => Some(n),
25 }
26}
27
28const KB: u64 = 1_024;
29const MB: u64 = 1_024 * 1_024;
30const GB: u64 = 1_024 * 1_024 * 1_024;
31const TB: u64 = 1_024 * 1_024 * 1_024 * 1_024;
32
33pub fn format_size(bytes: u64) -> String {
36 if bytes >= TB {
37 format!("{:.1} TB", bytes as f64 / TB as f64)
38 } else if bytes >= GB {
39 format!("{:.1} GB", bytes as f64 / GB as f64)
40 } else if bytes >= MB {
41 format!("{:.1} MB", bytes as f64 / MB as f64)
42 } else if bytes >= KB {
43 format!("{:.1} KB", bytes as f64 / KB as f64)
44 } else {
45 format!("{bytes} B")
46 }
47}
48
49pub fn parse_size(size: &str) -> Result<u64> {
51 let s = size.trim();
52
53 if s == "0" {
54 return Ok(0);
55 }
56
57 let num_end = s
58 .find(|c: char| !c.is_ascii_digit() && c != '.')
59 .unwrap_or(s.len());
60 let (num_str, unit_str) = s.split_at(num_end);
61 let num_str = num_str.trim();
62 let unit_str = unit_str.trim().to_ascii_uppercase();
63
64 if num_str.is_empty() {
65 bail!("Invalid size string: {s:?} (no number found)");
66 }
67
68 let num: f64 = num_str
69 .parse()
70 .map_err(|_| anyhow::anyhow!("Invalid number in size string: {num_str:?}"))?;
71
72 let multiplier = match unit_str.as_str() {
73 "" | "B" => 1u64,
74 "K" | "KB" | "KIB" => KB,
75 "M" | "MB" | "MIB" => MB,
76 "G" | "GB" | "GIB" => GB,
77 "T" | "TB" | "TIB" => TB,
78 _ => bail!("Unknown size unit: {unit_str:?}"),
79 };
80
81 Ok((num * multiplier as f64) as u64)
82}
83
84pub fn get_threadpool(work_items: usize) -> Result<rayon::ThreadPool> {
93 debug_assert_ne!(
94 work_items, 0,
95 "the thread pool should not be instantiated when there are no work items to process"
96 );
97 let available = std::thread::available_parallelism()
99 .map(|n| n.get())
100 .unwrap_or(1)
101 .max(1);
102 let env_threads = std::env::var("DVS_NUM_THREADS")
103 .ok()
104 .and_then(|v| v.parse::<usize>().ok())
105 .filter(|&n| n > 0);
106 let num_threads = {
108 let work_limit = work_items.max(1);
109
110 let configured = match get_num_threads().or(env_threads) {
111 Some(n) => n.min(ENV_MAX_THREADS),
112 None => available
113 .saturating_mul(DEFAULT_THREADS_PER_CPU)
114 .min(DEFAULT_MAX_THREADS),
115 };
116 configured.min(work_limit)
117 };
118
119 let pool = rayon::ThreadPoolBuilder::new()
120 .num_threads(num_threads)
121 .build()?;
122 Ok(pool)
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 #[test]
129 fn parse_size_basic() {
130 assert_eq!(parse_size("0").unwrap(), 0);
131 assert_eq!(parse_size("500MB").unwrap(), 500 * MB);
132 assert_eq!(parse_size("500 MB").unwrap(), 500 * MB);
133 assert_eq!(parse_size("1GB").unwrap(), GB);
134 assert_eq!(parse_size("1 gb").unwrap(), GB);
135 assert_eq!(parse_size("2TB").unwrap(), 2 * TB);
136 assert_eq!(parse_size("100KB").unwrap(), 100 * KB);
137 assert_eq!(parse_size("1024B").unwrap(), 1024);
138 assert_eq!(parse_size("1.5GB").unwrap(), (1.5 * GB as f64) as u64);
139 assert!(parse_size("").is_err());
140 assert!(parse_size("MB").is_err());
141 assert!(parse_size("500XB").is_err());
142 }
143}