Skip to main content
← dvs documentation Rust API reference

dvs/backends/
local.rs

1use std::collections::HashSet;
2use std::fs::OpenOptions;
3use std::io::{BufReader, Write};
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6
7#[cfg(unix)]
8use anyhow::anyhow;
9use anyhow::{Result, bail};
10use fs_err as fs;
11use serde::{Deserialize, Serialize};
12
13use crate::Hashes;
14use crate::audit::{AuditEntry, parse_audit_log};
15use crate::backends::Backend;
16use crate::config::Compression;
17
18const AUDIT_LOG_FILENAME: &str = "audit.log.jsonl";
19/// Only protects the current dvs process, not concurrent dvs processes
20static AUDIT_LOG_LOCK: Mutex<()> = Mutex::new(());
21
22/// Resolve a group name to its GID.
23#[cfg(unix)]
24fn resolve_group(group_name: &str) -> Result<nix::unistd::Gid> {
25    use nix::unistd::Group;
26    let group =
27        Group::from_name(group_name)?.ok_or_else(|| anyhow!("Group '{}' not found", group_name))?;
28    Ok(nix::unistd::Gid::from_raw(group.gid.as_raw()))
29}
30
31#[cfg(not(unix))]
32fn resolve_group(_: &str) -> Result<()> {
33    Ok(())
34}
35
36/// Detect the current user's primary group name.
37#[cfg(unix)]
38fn detect_primary_group() -> Result<String> {
39    use nix::unistd::Group;
40    let gid = nix::unistd::getegid();
41    let group = Group::from_gid(gid)?
42        .ok_or_else(|| anyhow!("Could not resolve primary group for GID {}", gid))?;
43    Ok(group.name)
44}
45
46fn make_readonly(path: impl AsRef<Path>) -> Result<()> {
47    let mut perms = fs::metadata(path.as_ref())?.permissions();
48    perms.set_readonly(true);
49    fs::set_permissions(path.as_ref(), perms)?;
50    Ok(())
51}
52
53const SHARED_DIRECTORY_MODE: u32 = 0o2770;
54const SHARED_BLOB_MODE: u32 = 0o0440;
55const SHARED_AUDIT_LOG_MODE: u32 = 0o0660;
56
57const OPEN_DIRECTORY_MODE: u32 = 0o2777;
58const OPEN_AUDIT_LOG_MODE: u32 = 0o0666;
59
60#[cfg(unix)]
61fn ensure_mode(path: impl AsRef<Path>, mode: u32) -> Result<()> {
62    use std::os::unix::fs::PermissionsExt;
63
64    let path = path.as_ref();
65    let mut perms = fs::metadata(path)?.permissions();
66    if perms.mode() & 0o7777 != mode {
67        perms.set_mode(mode);
68        fs::set_permissions(path, perms)?;
69    }
70    Ok(())
71}
72
73#[cfg(not(unix))]
74fn ensure_mode(_path: impl AsRef<Path>, _mode: u32) -> Result<()> {
75    Ok(())
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
79pub struct LocalBackend {
80    pub path: PathBuf,
81    group: Option<String>,
82    #[serde(default, skip_serializing_if = "core::ops::Not::not")]
83    open: bool,
84}
85
86impl LocalBackend {
87    pub fn new(path: impl AsRef<Path>, group: Option<String>) -> Result<Self> {
88        let group = match group {
89            Some(g) => {
90                resolve_group(&g)?;
91                Some(g)
92            }
93            None => {
94                #[cfg(unix)]
95                {
96                    match detect_primary_group() {
97                        Ok(g) => {
98                            log::debug!("Auto-detected primary group: {}", g);
99                            Some(g)
100                        }
101                        Err(e) => {
102                            log::warn!("Could not detect primary group: {e}");
103                            None
104                        }
105                    }
106                }
107                #[cfg(not(unix))]
108                {
109                    None
110                }
111            }
112        };
113
114        Ok(Self {
115            path: path.as_ref().to_path_buf(),
116            group,
117            open: false,
118        })
119    }
120
121    /// Apply configured group ownership to a path.
122    /// No-op on non-Unix or if no group is set.
123    #[cfg(unix)]
124    fn apply_group(&self, path: impl AsRef<Path>) -> Result<()> {
125        use std::os::unix::fs::MetadataExt;
126
127        use nix::unistd::chown;
128
129        if let Some(group_name) = &self.group {
130            let path = path.as_ref();
131            let gid = resolve_group(group_name)?;
132            if fs::metadata(path)?.gid() == gid.as_raw() {
133                return Ok(());
134            }
135
136            log::debug!("Setting group {} on {}", group_name, path.display());
137            chown(path, None, Some(gid))?;
138        }
139
140        Ok(())
141    }
142
143    #[cfg(not(unix))]
144    fn apply_group(&self, _path: impl AsRef<Path>) -> Result<()> {
145        Ok(())
146    }
147
148    fn ensure_group_and_mode(&self, path: impl AsRef<Path>, mode: u32) -> Result<()> {
149        if self.group.is_some() || self.open {
150            let path = path.as_ref();
151            self.apply_group(path)?;
152            ensure_mode(path, mode)?;
153        }
154        Ok(())
155    }
156
157    fn dir_mode(&self) -> u32 {
158        if self.open {
159            OPEN_DIRECTORY_MODE
160        } else {
161            SHARED_DIRECTORY_MODE
162        }
163    }
164
165    fn audit_mode(&self) -> u32 {
166        if self.open {
167            OPEN_AUDIT_LOG_MODE
168        } else {
169            SHARED_AUDIT_LOG_MODE
170        }
171    }
172
173    // On non-Unix, a configured group is intentionally a no-op. We still need
174    // blobs to become read-only eg on Windows, so this cannot be just `self.group.is_some()`.
175    fn use_shared_blob_mode(&self) -> bool {
176        #[cfg(unix)]
177        {
178            self.group.is_some()
179        }
180        #[cfg(not(unix))]
181        {
182            false
183        }
184    }
185
186    fn hash_to_path(&self, hashes: &Hashes) -> Result<PathBuf> {
187        let hash = hashes.get_blake3();
188        if hash.len() < 3 || !hash.chars().all(|c| c.is_ascii_hexdigit()) {
189            bail!("Invalid hash: {hash}");
190        }
191        let (prefix, suffix) = hash.split_at(2);
192        Ok(self.path.join(prefix).join(suffix))
193    }
194}
195
196impl Backend for LocalBackend {
197    fn is_initialized(&self) -> Result<bool> {
198        Ok(self.path.join(AUDIT_LOG_FILENAME).exists())
199    }
200
201    fn init(&self) -> Result<()> {
202        log::debug!("Creating storage directory: {}", self.path.display());
203        fs::create_dir_all(&self.path)?;
204        self.ensure_group_and_mode(&self.path, self.dir_mode())?;
205        log::info!("Initialized local storage at {}", self.path.display());
206        Ok(())
207    }
208
209    fn store(
210        &self,
211        hash: &Hashes,
212        source: &Path,
213        compression: Compression,
214        on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
215    ) -> Result<u64> {
216        let path = self.hash_to_path(hash)?;
217        if let Some(parent) = path.parent() {
218            fs::create_dir_all(parent)?;
219            self.ensure_group_and_mode(&self.path, self.dir_mode())?;
220            self.ensure_group_and_mode(parent, self.dir_mode())?;
221        }
222        let tmp_path = path.with_extension("tmp");
223        let stored_size = compression.compress(source, &tmp_path, on_bytes)?;
224
225        if self.use_shared_blob_mode() {
226            self.ensure_group_and_mode(&tmp_path, SHARED_BLOB_MODE)?;
227        } else {
228            make_readonly(&tmp_path)?;
229        }
230        fs::rename(&tmp_path, &path)?;
231        Ok(stored_size)
232    }
233
234    fn retrieve(
235        &self,
236        hash: &Hashes,
237        target: &Path,
238        compression: Compression,
239        on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
240    ) -> Result<bool> {
241        let path = self.hash_to_path(hash)?;
242        if path.is_file() {
243            if let Some(parent) = target.parent() {
244                fs::create_dir_all(parent)?;
245            }
246            compression.decompress(&path, target, on_bytes)?;
247            Ok(true)
248        } else {
249            Ok(false)
250        }
251    }
252
253    fn exists(&self, hash: &Hashes) -> Result<bool> {
254        Ok(self.hash_to_path(hash)?.is_file())
255    }
256
257    fn remove(&self, hash: &Hashes) -> Result<()> {
258        let path = self.hash_to_path(hash)?;
259        if path.is_file() {
260            log::debug!("Removing {path:?} from storage");
261            fs::remove_file(path)?;
262        }
263        Ok(())
264    }
265
266    fn log_audit(&self, entry: &AuditEntry) -> Result<()> {
267        let _guard = AUDIT_LOG_LOCK
268            .lock()
269            .expect("audit log lock should not be poisoned");
270        log::debug!("Appending {entry:?} to audit log");
271
272        fs::create_dir_all(&self.path)?;
273        self.ensure_group_and_mode(&self.path, self.dir_mode())?;
274        let audit_path = self.path.join(AUDIT_LOG_FILENAME);
275        let mut file = OpenOptions::new()
276            .create(true)
277            .append(true)
278            .open(&audit_path)?;
279        self.ensure_group_and_mode(&audit_path, self.audit_mode())?;
280        let json = serde_json::to_string(entry)?;
281        writeln!(file, "{}", json)?;
282        Ok(())
283    }
284
285    fn read_audit_file(&self, files: &[PathBuf]) -> Result<Vec<AuditEntry>> {
286        let files_to_include: HashSet<_> = HashSet::from_iter(files.iter().cloned());
287        let audit_path = self.path.join(AUDIT_LOG_FILENAME);
288        let f = fs::File::open(&audit_path)?;
289        let entries = parse_audit_log(BufReader::new(f), &files_to_include)?;
290        Ok(entries)
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::audit::{Action, AuditEntry, AuditFile, parse_audit_log};
298    use crate::config::Compression;
299    use crate::hashes::Hashes;
300    use std::io::Cursor;
301
302    fn test_hash(hash: &str) -> Hashes {
303        Hashes {
304            blake3: hash.to_string(),
305            md5: None,
306        }
307    }
308
309    #[test]
310    fn hash_to_path_rejects_bad_hash() {
311        let backend = LocalBackend::new("/tmp/storage", None).unwrap();
312
313        // These should error or be sanitized
314        assert!(
315            backend
316                .hash_to_path(&test_hash("../../etc/passwd"))
317                .is_err()
318        );
319        assert!(backend.hash_to_path(&test_hash("../escape")).is_err());
320        assert!(
321            backend
322                .hash_to_path(&test_hash("d41d8cd98f00b204e9800998ecf8427e"))
323                .is_ok()
324        );
325    }
326
327    #[test]
328    fn init_creates_storage_directory() {
329        let tmp = tempfile::tempdir().unwrap();
330        let storage_path = tmp.path().join("storage");
331
332        let backend = LocalBackend::new(&storage_path, None).unwrap();
333        assert!(!storage_path.exists());
334
335        backend.init().unwrap();
336        assert!(storage_path.is_dir());
337    }
338
339    #[test]
340    fn store_creates_hash_prefixed_path() {
341        let tmp = tempfile::tempdir().unwrap();
342        let storage = tmp.path().join("storage");
343        let backend = LocalBackend::new(&storage, None).unwrap();
344        backend.init().unwrap();
345
346        // Create source file
347        let source = tmp.path().join("source.txt");
348        fs::write(&source, b"test content").unwrap();
349
350        let hash = test_hash("d41d8cd98f00b204e9800998ecf8427e");
351        backend
352            .store(&hash, &source, Compression::None, None)
353            .unwrap();
354
355        let stored = storage.join("d4").join("1d8cd98f00b204e9800998ecf8427e");
356        assert!(stored.is_file());
357        assert_eq!(fs::read(&stored).unwrap(), b"test content");
358    }
359
360    #[test]
361    fn retrieve_copies_to_target() {
362        let tmp = tempfile::tempdir().unwrap();
363        let storage = tmp.path().join("storage");
364        let backend = LocalBackend::new(&storage, None).unwrap();
365        backend.init().unwrap();
366
367        // Store content via store()
368        let hash = test_hash("abc123def456789012345678901234ab");
369        let source = tmp.path().join("source.txt");
370        fs::write(&source, b"stored content").unwrap();
371        backend
372            .store(&hash, &source, Compression::None, None)
373            .unwrap();
374
375        // Retrieve to new location
376        let target = tmp.path().join("retrieved.txt");
377        let result = backend
378            .retrieve(&hash, &target, Compression::None, None)
379            .unwrap();
380
381        // file was copied if result == true
382        assert!(result);
383        assert_eq!(fs::read(&target).unwrap(), b"stored content");
384    }
385
386    #[test]
387    fn retrieve_returns_false_when_missing() {
388        let tmp = tempfile::tempdir().unwrap();
389        let storage = tmp.path().join("storage");
390        let backend = LocalBackend::new(&storage, None).unwrap();
391        backend.init().unwrap();
392
393        let target = tmp.path().join("target.txt");
394        let result = backend
395            .retrieve(
396                &test_hash("1234567890123456789012"),
397                &target,
398                Compression::None,
399                None,
400            )
401            .unwrap();
402
403        assert!(!result);
404        assert!(!target.exists());
405    }
406
407    #[test]
408    fn exists_returns_true_for_stored() {
409        let tmp = tempfile::tempdir().unwrap();
410        let storage = tmp.path().join("storage");
411        let backend = LocalBackend::new(&storage, None).unwrap();
412        backend.init().unwrap();
413
414        let hash = test_hash("abc123def456789012345678901234ab");
415        assert!(!backend.exists(&hash).unwrap());
416        let source = tmp.path().join("source.txt");
417        fs::write(&source, b"content").unwrap();
418        backend
419            .store(&hash, &source, Compression::None, None)
420            .unwrap();
421        assert!(backend.exists(&hash).unwrap());
422    }
423
424    #[test]
425    fn remove_deletes_stored_file() {
426        let tmp = tempfile::tempdir().unwrap();
427        let storage = tmp.path().join("storage");
428        let backend = LocalBackend::new(&storage, None).unwrap();
429        backend.init().unwrap();
430
431        let hash = test_hash("abc123def456789012345678901234ab");
432        let source = tmp.path().join("source.txt");
433        fs::write(&source, b"content").unwrap();
434        backend
435            .store(&hash, &source, Compression::None, None)
436            .unwrap();
437        assert!(backend.exists(&hash).unwrap());
438
439        backend.remove(&hash).unwrap();
440        assert!(!backend.exists(&hash).unwrap());
441        // removing something that doesn't exist is a noop
442        backend.remove(&hash).unwrap();
443    }
444
445    #[test]
446    fn stored_files_are_readonly() {
447        let tmp = tempfile::tempdir().unwrap();
448        let storage = tmp.path().join("storage");
449        let backend = LocalBackend::new(&storage, None).unwrap();
450        backend.init().unwrap();
451
452        let hash = test_hash("abc123def456789012345678901234ab");
453        let source = tmp.path().join("source.txt");
454        fs::write(&source, b"content").unwrap();
455        backend
456            .store(&hash, &source, Compression::None, None)
457            .unwrap();
458
459        let stored = storage.join("ab").join("c123def456789012345678901234ab");
460        let perms = fs::metadata(&stored).unwrap().permissions();
461        assert!(perms.readonly());
462    }
463
464    #[test]
465    fn log_audit_appends_to_jsonl() {
466        let tmp = tempfile::tempdir().unwrap();
467        let storage = tmp.path().join("storage");
468        let backend = LocalBackend::new(&storage, None).unwrap();
469        backend.init().unwrap();
470
471        let hash = test_hash("abc123def456789012345678901234ab");
472
473        let entry1 = AuditEntry {
474            operation_id: "op-1".to_string(),
475            timestamp: 1000000000,
476            user: "alice".to_string(),
477            action: Action::Add {
478                file: AuditFile {
479                    path: PathBuf::from("file1.txt"),
480                    hashes: hash.clone(),
481                },
482                compression: Compression::Zstd,
483            },
484        };
485
486        let entry2 = AuditEntry {
487            operation_id: "op-2".to_string(),
488            timestamp: 2000000000,
489            user: "bob".to_string(),
490            action: Action::Add {
491                file: AuditFile {
492                    path: PathBuf::from("file2.txt"),
493                    hashes: hash.clone(),
494                },
495                compression: Compression::Zstd,
496            },
497        };
498
499        backend.log_audit(&entry1).unwrap();
500        backend.log_audit(&entry2).unwrap();
501
502        let audit_path = storage.join("audit.log.jsonl");
503        assert!(audit_path.is_file());
504
505        let content = fs::read(&audit_path).unwrap();
506        let entries = parse_audit_log(Cursor::new(content), &HashSet::new()).unwrap();
507        assert_eq!(entries.len(), 2);
508
509        assert_eq!(entries[0].operation_id, "op-1");
510        assert_eq!(entries[0].timestamp, 1000000000);
511        assert_eq!(entries[0].user, "alice");
512
513        assert_eq!(entries[1].operation_id, "op-2");
514        assert_eq!(entries[1].timestamp, 2000000000);
515        assert_eq!(entries[1].user, "bob");
516    }
517
518    #[test]
519    fn log_audit_is_valid_jsonl_under_concurrency() {
520        let tmp = tempfile::tempdir().unwrap();
521        let storage = tmp.path().join("storage");
522        let backend = LocalBackend::new(&storage, None).unwrap();
523        backend.init().unwrap();
524
525        let hash = test_hash("abc123def456789012345678901234ab");
526        let workers = 4;
527        let entries_per_worker = 64;
528
529        std::thread::scope(|scope| {
530            let backend = &backend;
531            for worker in 0..workers {
532                let hash = hash.clone();
533                scope.spawn(move || {
534                    for idx in 0..entries_per_worker {
535                        let entry = AuditEntry {
536                            operation_id: format!("op-{worker}-{idx}"),
537                            timestamp: (worker * entries_per_worker + idx) as i64,
538                            user: format!("user-{worker}"),
539                            action: Action::Add {
540                                file: AuditFile {
541                                    path: PathBuf::from(format!("file-{worker}-{idx}.txt")),
542                                    hashes: hash.clone(),
543                                },
544                                compression: Compression::Zstd,
545                            },
546                        };
547                        backend.log_audit(&entry).unwrap();
548                    }
549                });
550            }
551        });
552
553        let audit_path = storage.join("audit.log.jsonl");
554        let content = fs::read(&audit_path).unwrap();
555        let entries = parse_audit_log(Cursor::new(content), &HashSet::new()).unwrap();
556        assert_eq!(entries.len(), workers * entries_per_worker);
557    }
558
559    #[cfg(unix)]
560    fn test_audit_entry() -> AuditEntry {
561        AuditEntry {
562            operation_id: "op-1".to_string(),
563            timestamp: 1000000000,
564            user: "alice".to_string(),
565            action: Action::Add {
566                file: AuditFile {
567                    path: PathBuf::from("file1.txt"),
568                    hashes: test_hash("abc123def456789012345678901234ab"),
569                },
570                compression: Compression::Zstd,
571            },
572        }
573    }
574
575    #[cfg(unix)]
576    #[test]
577    fn permissions_with_group_use_shared_modes() {
578        use nix::unistd::{Group, getegid};
579        use std::os::unix::fs::PermissionsExt;
580
581        let current_group_name = Group::from_gid(getegid()).unwrap().unwrap().name;
582
583        let tmp = tempfile::tempdir().unwrap();
584        let storage = tmp.path().join("storage");
585        let backend = LocalBackend::new(&storage, Some(current_group_name)).unwrap();
586
587        backend.init().unwrap();
588        let storage_mode = fs::metadata(&storage).unwrap().permissions().mode();
589        assert_eq!(storage_mode & 0o7777, SHARED_DIRECTORY_MODE);
590
591        let hash = test_hash("abc123def456789012345678901234ab");
592        let source = tmp.path().join("source.txt");
593        fs::write(&source, b"content").unwrap();
594        backend
595            .store(&hash, &source, Compression::None, None)
596            .unwrap();
597
598        let prefix_dir = storage.join("ab");
599        let prefix_mode = fs::metadata(&prefix_dir).unwrap().permissions().mode();
600        assert_eq!(prefix_mode & 0o7777, SHARED_DIRECTORY_MODE);
601
602        let stored = prefix_dir.join("c123def456789012345678901234ab");
603        let file_mode = fs::metadata(&stored).unwrap().permissions().mode();
604        assert_eq!(file_mode & 0o777, SHARED_BLOB_MODE);
605
606        backend.log_audit(&test_audit_entry()).unwrap();
607        let audit_path = storage.join("audit.log.jsonl");
608        let audit_mode = fs::metadata(&audit_path).unwrap().permissions().mode();
609        assert_eq!(audit_mode & 0o777, SHARED_AUDIT_LOG_MODE);
610    }
611}