1use std::collections::HashSet;
2use std::fs::OpenOptions;
3use std::io::{BufReader, Write};
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6
7#[cfg(unix)]
8use anyhow::anyhow;
9use anyhow::{Result, bail};
10use fs_err as fs;
11use serde::{Deserialize, Serialize};
12
13use crate::Hashes;
14use crate::audit::{AuditEntry, parse_audit_log};
15use crate::backends::Backend;
16use crate::config::Compression;
17
18const AUDIT_LOG_FILENAME: &str = "audit.log.jsonl";
19static AUDIT_LOG_LOCK: Mutex<()> = Mutex::new(());
21
22#[cfg(unix)]
24fn resolve_group(group_name: &str) -> Result<nix::unistd::Gid> {
25 use nix::unistd::Group;
26 let group =
27 Group::from_name(group_name)?.ok_or_else(|| anyhow!("Group '{}' not found", group_name))?;
28 Ok(nix::unistd::Gid::from_raw(group.gid.as_raw()))
29}
30
31#[cfg(not(unix))]
32fn resolve_group(_: &str) -> Result<()> {
33 Ok(())
34}
35
36#[cfg(unix)]
38fn detect_primary_group() -> Result<String> {
39 use nix::unistd::Group;
40 let gid = nix::unistd::getegid();
41 let group = Group::from_gid(gid)?
42 .ok_or_else(|| anyhow!("Could not resolve primary group for GID {}", gid))?;
43 Ok(group.name)
44}
45
46fn make_readonly(path: impl AsRef<Path>) -> Result<()> {
47 let mut perms = fs::metadata(path.as_ref())?.permissions();
48 perms.set_readonly(true);
49 fs::set_permissions(path.as_ref(), perms)?;
50 Ok(())
51}
52
53const SHARED_DIRECTORY_MODE: u32 = 0o2770;
54const SHARED_BLOB_MODE: u32 = 0o0440;
55const SHARED_AUDIT_LOG_MODE: u32 = 0o0660;
56
57const OPEN_DIRECTORY_MODE: u32 = 0o2777;
58const OPEN_AUDIT_LOG_MODE: u32 = 0o0666;
59
60#[cfg(unix)]
61fn ensure_mode(path: impl AsRef<Path>, mode: u32) -> Result<()> {
62 use std::os::unix::fs::PermissionsExt;
63
64 let path = path.as_ref();
65 let mut perms = fs::metadata(path)?.permissions();
66 if perms.mode() & 0o7777 != mode {
67 perms.set_mode(mode);
68 fs::set_permissions(path, perms)?;
69 }
70 Ok(())
71}
72
73#[cfg(not(unix))]
74fn ensure_mode(_path: impl AsRef<Path>, _mode: u32) -> Result<()> {
75 Ok(())
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
79pub struct LocalBackend {
80 pub path: PathBuf,
81 group: Option<String>,
82 #[serde(default, skip_serializing_if = "core::ops::Not::not")]
83 open: bool,
84}
85
86impl LocalBackend {
87 pub fn new(path: impl AsRef<Path>, group: Option<String>) -> Result<Self> {
88 let group = match group {
89 Some(g) => {
90 resolve_group(&g)?;
91 Some(g)
92 }
93 None => {
94 #[cfg(unix)]
95 {
96 match detect_primary_group() {
97 Ok(g) => {
98 log::debug!("Auto-detected primary group: {}", g);
99 Some(g)
100 }
101 Err(e) => {
102 log::warn!("Could not detect primary group: {e}");
103 None
104 }
105 }
106 }
107 #[cfg(not(unix))]
108 {
109 None
110 }
111 }
112 };
113
114 Ok(Self {
115 path: path.as_ref().to_path_buf(),
116 group,
117 open: false,
118 })
119 }
120
121 #[cfg(unix)]
124 fn apply_group(&self, path: impl AsRef<Path>) -> Result<()> {
125 use std::os::unix::fs::MetadataExt;
126
127 use nix::unistd::chown;
128
129 if let Some(group_name) = &self.group {
130 let path = path.as_ref();
131 let gid = resolve_group(group_name)?;
132 if fs::metadata(path)?.gid() == gid.as_raw() {
133 return Ok(());
134 }
135
136 log::debug!("Setting group {} on {}", group_name, path.display());
137 chown(path, None, Some(gid))?;
138 }
139
140 Ok(())
141 }
142
143 #[cfg(not(unix))]
144 fn apply_group(&self, _path: impl AsRef<Path>) -> Result<()> {
145 Ok(())
146 }
147
148 fn ensure_group_and_mode(&self, path: impl AsRef<Path>, mode: u32) -> Result<()> {
149 if self.group.is_some() || self.open {
150 let path = path.as_ref();
151 self.apply_group(path)?;
152 ensure_mode(path, mode)?;
153 }
154 Ok(())
155 }
156
157 fn dir_mode(&self) -> u32 {
158 if self.open {
159 OPEN_DIRECTORY_MODE
160 } else {
161 SHARED_DIRECTORY_MODE
162 }
163 }
164
165 fn audit_mode(&self) -> u32 {
166 if self.open {
167 OPEN_AUDIT_LOG_MODE
168 } else {
169 SHARED_AUDIT_LOG_MODE
170 }
171 }
172
173 fn use_shared_blob_mode(&self) -> bool {
176 #[cfg(unix)]
177 {
178 self.group.is_some()
179 }
180 #[cfg(not(unix))]
181 {
182 false
183 }
184 }
185
186 fn hash_to_path(&self, hashes: &Hashes) -> Result<PathBuf> {
187 let hash = hashes.get_blake3();
188 if hash.len() < 3 || !hash.chars().all(|c| c.is_ascii_hexdigit()) {
189 bail!("Invalid hash: {hash}");
190 }
191 let (prefix, suffix) = hash.split_at(2);
192 Ok(self.path.join(prefix).join(suffix))
193 }
194}
195
196impl Backend for LocalBackend {
197 fn is_initialized(&self) -> Result<bool> {
198 Ok(self.path.join(AUDIT_LOG_FILENAME).exists())
199 }
200
201 fn init(&self) -> Result<()> {
202 log::debug!("Creating storage directory: {}", self.path.display());
203 fs::create_dir_all(&self.path)?;
204 self.ensure_group_and_mode(&self.path, self.dir_mode())?;
205 log::info!("Initialized local storage at {}", self.path.display());
206 Ok(())
207 }
208
209 fn store(
210 &self,
211 hash: &Hashes,
212 source: &Path,
213 compression: Compression,
214 on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
215 ) -> Result<u64> {
216 let path = self.hash_to_path(hash)?;
217 if let Some(parent) = path.parent() {
218 fs::create_dir_all(parent)?;
219 self.ensure_group_and_mode(&self.path, self.dir_mode())?;
220 self.ensure_group_and_mode(parent, self.dir_mode())?;
221 }
222 let tmp_path = path.with_extension("tmp");
223 let stored_size = compression.compress(source, &tmp_path, on_bytes)?;
224
225 if self.use_shared_blob_mode() {
226 self.ensure_group_and_mode(&tmp_path, SHARED_BLOB_MODE)?;
227 } else {
228 make_readonly(&tmp_path)?;
229 }
230 fs::rename(&tmp_path, &path)?;
231 Ok(stored_size)
232 }
233
234 fn retrieve(
235 &self,
236 hash: &Hashes,
237 target: &Path,
238 compression: Compression,
239 on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
240 ) -> Result<bool> {
241 let path = self.hash_to_path(hash)?;
242 if path.is_file() {
243 if let Some(parent) = target.parent() {
244 fs::create_dir_all(parent)?;
245 }
246 compression.decompress(&path, target, on_bytes)?;
247 Ok(true)
248 } else {
249 Ok(false)
250 }
251 }
252
253 fn exists(&self, hash: &Hashes) -> Result<bool> {
254 Ok(self.hash_to_path(hash)?.is_file())
255 }
256
257 fn remove(&self, hash: &Hashes) -> Result<()> {
258 let path = self.hash_to_path(hash)?;
259 if path.is_file() {
260 log::debug!("Removing {path:?} from storage");
261 fs::remove_file(path)?;
262 }
263 Ok(())
264 }
265
266 fn log_audit(&self, entry: &AuditEntry) -> Result<()> {
267 let _guard = AUDIT_LOG_LOCK
268 .lock()
269 .expect("audit log lock should not be poisoned");
270 log::debug!("Appending {entry:?} to audit log");
271
272 fs::create_dir_all(&self.path)?;
273 self.ensure_group_and_mode(&self.path, self.dir_mode())?;
274 let audit_path = self.path.join(AUDIT_LOG_FILENAME);
275 let mut file = OpenOptions::new()
276 .create(true)
277 .append(true)
278 .open(&audit_path)?;
279 self.ensure_group_and_mode(&audit_path, self.audit_mode())?;
280 let json = serde_json::to_string(entry)?;
281 writeln!(file, "{}", json)?;
282 Ok(())
283 }
284
285 fn read_audit_file(&self, files: &[PathBuf]) -> Result<Vec<AuditEntry>> {
286 let files_to_include: HashSet<_> = HashSet::from_iter(files.iter().cloned());
287 let audit_path = self.path.join(AUDIT_LOG_FILENAME);
288 let f = fs::File::open(&audit_path)?;
289 let entries = parse_audit_log(BufReader::new(f), &files_to_include)?;
290 Ok(entries)
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::audit::{Action, AuditEntry, AuditFile, parse_audit_log};
298 use crate::config::Compression;
299 use crate::hashes::Hashes;
300 use std::io::Cursor;
301
302 fn test_hash(hash: &str) -> Hashes {
303 Hashes {
304 blake3: hash.to_string(),
305 md5: None,
306 }
307 }
308
309 #[test]
310 fn hash_to_path_rejects_bad_hash() {
311 let backend = LocalBackend::new("/tmp/storage", None).unwrap();
312
313 assert!(
315 backend
316 .hash_to_path(&test_hash("../../etc/passwd"))
317 .is_err()
318 );
319 assert!(backend.hash_to_path(&test_hash("../escape")).is_err());
320 assert!(
321 backend
322 .hash_to_path(&test_hash("d41d8cd98f00b204e9800998ecf8427e"))
323 .is_ok()
324 );
325 }
326
327 #[test]
328 fn init_creates_storage_directory() {
329 let tmp = tempfile::tempdir().unwrap();
330 let storage_path = tmp.path().join("storage");
331
332 let backend = LocalBackend::new(&storage_path, None).unwrap();
333 assert!(!storage_path.exists());
334
335 backend.init().unwrap();
336 assert!(storage_path.is_dir());
337 }
338
339 #[test]
340 fn store_creates_hash_prefixed_path() {
341 let tmp = tempfile::tempdir().unwrap();
342 let storage = tmp.path().join("storage");
343 let backend = LocalBackend::new(&storage, None).unwrap();
344 backend.init().unwrap();
345
346 let source = tmp.path().join("source.txt");
348 fs::write(&source, b"test content").unwrap();
349
350 let hash = test_hash("d41d8cd98f00b204e9800998ecf8427e");
351 backend
352 .store(&hash, &source, Compression::None, None)
353 .unwrap();
354
355 let stored = storage.join("d4").join("1d8cd98f00b204e9800998ecf8427e");
356 assert!(stored.is_file());
357 assert_eq!(fs::read(&stored).unwrap(), b"test content");
358 }
359
360 #[test]
361 fn retrieve_copies_to_target() {
362 let tmp = tempfile::tempdir().unwrap();
363 let storage = tmp.path().join("storage");
364 let backend = LocalBackend::new(&storage, None).unwrap();
365 backend.init().unwrap();
366
367 let hash = test_hash("abc123def456789012345678901234ab");
369 let source = tmp.path().join("source.txt");
370 fs::write(&source, b"stored content").unwrap();
371 backend
372 .store(&hash, &source, Compression::None, None)
373 .unwrap();
374
375 let target = tmp.path().join("retrieved.txt");
377 let result = backend
378 .retrieve(&hash, &target, Compression::None, None)
379 .unwrap();
380
381 assert!(result);
383 assert_eq!(fs::read(&target).unwrap(), b"stored content");
384 }
385
386 #[test]
387 fn retrieve_returns_false_when_missing() {
388 let tmp = tempfile::tempdir().unwrap();
389 let storage = tmp.path().join("storage");
390 let backend = LocalBackend::new(&storage, None).unwrap();
391 backend.init().unwrap();
392
393 let target = tmp.path().join("target.txt");
394 let result = backend
395 .retrieve(
396 &test_hash("1234567890123456789012"),
397 &target,
398 Compression::None,
399 None,
400 )
401 .unwrap();
402
403 assert!(!result);
404 assert!(!target.exists());
405 }
406
407 #[test]
408 fn exists_returns_true_for_stored() {
409 let tmp = tempfile::tempdir().unwrap();
410 let storage = tmp.path().join("storage");
411 let backend = LocalBackend::new(&storage, None).unwrap();
412 backend.init().unwrap();
413
414 let hash = test_hash("abc123def456789012345678901234ab");
415 assert!(!backend.exists(&hash).unwrap());
416 let source = tmp.path().join("source.txt");
417 fs::write(&source, b"content").unwrap();
418 backend
419 .store(&hash, &source, Compression::None, None)
420 .unwrap();
421 assert!(backend.exists(&hash).unwrap());
422 }
423
424 #[test]
425 fn remove_deletes_stored_file() {
426 let tmp = tempfile::tempdir().unwrap();
427 let storage = tmp.path().join("storage");
428 let backend = LocalBackend::new(&storage, None).unwrap();
429 backend.init().unwrap();
430
431 let hash = test_hash("abc123def456789012345678901234ab");
432 let source = tmp.path().join("source.txt");
433 fs::write(&source, b"content").unwrap();
434 backend
435 .store(&hash, &source, Compression::None, None)
436 .unwrap();
437 assert!(backend.exists(&hash).unwrap());
438
439 backend.remove(&hash).unwrap();
440 assert!(!backend.exists(&hash).unwrap());
441 backend.remove(&hash).unwrap();
443 }
444
445 #[test]
446 fn stored_files_are_readonly() {
447 let tmp = tempfile::tempdir().unwrap();
448 let storage = tmp.path().join("storage");
449 let backend = LocalBackend::new(&storage, None).unwrap();
450 backend.init().unwrap();
451
452 let hash = test_hash("abc123def456789012345678901234ab");
453 let source = tmp.path().join("source.txt");
454 fs::write(&source, b"content").unwrap();
455 backend
456 .store(&hash, &source, Compression::None, None)
457 .unwrap();
458
459 let stored = storage.join("ab").join("c123def456789012345678901234ab");
460 let perms = fs::metadata(&stored).unwrap().permissions();
461 assert!(perms.readonly());
462 }
463
464 #[test]
465 fn log_audit_appends_to_jsonl() {
466 let tmp = tempfile::tempdir().unwrap();
467 let storage = tmp.path().join("storage");
468 let backend = LocalBackend::new(&storage, None).unwrap();
469 backend.init().unwrap();
470
471 let hash = test_hash("abc123def456789012345678901234ab");
472
473 let entry1 = AuditEntry {
474 operation_id: "op-1".to_string(),
475 timestamp: 1000000000,
476 user: "alice".to_string(),
477 action: Action::Add {
478 file: AuditFile {
479 path: PathBuf::from("file1.txt"),
480 hashes: hash.clone(),
481 },
482 compression: Compression::Zstd,
483 },
484 };
485
486 let entry2 = AuditEntry {
487 operation_id: "op-2".to_string(),
488 timestamp: 2000000000,
489 user: "bob".to_string(),
490 action: Action::Add {
491 file: AuditFile {
492 path: PathBuf::from("file2.txt"),
493 hashes: hash.clone(),
494 },
495 compression: Compression::Zstd,
496 },
497 };
498
499 backend.log_audit(&entry1).unwrap();
500 backend.log_audit(&entry2).unwrap();
501
502 let audit_path = storage.join("audit.log.jsonl");
503 assert!(audit_path.is_file());
504
505 let content = fs::read(&audit_path).unwrap();
506 let entries = parse_audit_log(Cursor::new(content), &HashSet::new()).unwrap();
507 assert_eq!(entries.len(), 2);
508
509 assert_eq!(entries[0].operation_id, "op-1");
510 assert_eq!(entries[0].timestamp, 1000000000);
511 assert_eq!(entries[0].user, "alice");
512
513 assert_eq!(entries[1].operation_id, "op-2");
514 assert_eq!(entries[1].timestamp, 2000000000);
515 assert_eq!(entries[1].user, "bob");
516 }
517
518 #[test]
519 fn log_audit_is_valid_jsonl_under_concurrency() {
520 let tmp = tempfile::tempdir().unwrap();
521 let storage = tmp.path().join("storage");
522 let backend = LocalBackend::new(&storage, None).unwrap();
523 backend.init().unwrap();
524
525 let hash = test_hash("abc123def456789012345678901234ab");
526 let workers = 4;
527 let entries_per_worker = 64;
528
529 std::thread::scope(|scope| {
530 let backend = &backend;
531 for worker in 0..workers {
532 let hash = hash.clone();
533 scope.spawn(move || {
534 for idx in 0..entries_per_worker {
535 let entry = AuditEntry {
536 operation_id: format!("op-{worker}-{idx}"),
537 timestamp: (worker * entries_per_worker + idx) as i64,
538 user: format!("user-{worker}"),
539 action: Action::Add {
540 file: AuditFile {
541 path: PathBuf::from(format!("file-{worker}-{idx}.txt")),
542 hashes: hash.clone(),
543 },
544 compression: Compression::Zstd,
545 },
546 };
547 backend.log_audit(&entry).unwrap();
548 }
549 });
550 }
551 });
552
553 let audit_path = storage.join("audit.log.jsonl");
554 let content = fs::read(&audit_path).unwrap();
555 let entries = parse_audit_log(Cursor::new(content), &HashSet::new()).unwrap();
556 assert_eq!(entries.len(), workers * entries_per_worker);
557 }
558
559 #[cfg(unix)]
560 fn test_audit_entry() -> AuditEntry {
561 AuditEntry {
562 operation_id: "op-1".to_string(),
563 timestamp: 1000000000,
564 user: "alice".to_string(),
565 action: Action::Add {
566 file: AuditFile {
567 path: PathBuf::from("file1.txt"),
568 hashes: test_hash("abc123def456789012345678901234ab"),
569 },
570 compression: Compression::Zstd,
571 },
572 }
573 }
574
575 #[cfg(unix)]
576 #[test]
577 fn permissions_with_group_use_shared_modes() {
578 use nix::unistd::{Group, getegid};
579 use std::os::unix::fs::PermissionsExt;
580
581 let current_group_name = Group::from_gid(getegid()).unwrap().unwrap().name;
582
583 let tmp = tempfile::tempdir().unwrap();
584 let storage = tmp.path().join("storage");
585 let backend = LocalBackend::new(&storage, Some(current_group_name)).unwrap();
586
587 backend.init().unwrap();
588 let storage_mode = fs::metadata(&storage).unwrap().permissions().mode();
589 assert_eq!(storage_mode & 0o7777, SHARED_DIRECTORY_MODE);
590
591 let hash = test_hash("abc123def456789012345678901234ab");
592 let source = tmp.path().join("source.txt");
593 fs::write(&source, b"content").unwrap();
594 backend
595 .store(&hash, &source, Compression::None, None)
596 .unwrap();
597
598 let prefix_dir = storage.join("ab");
599 let prefix_mode = fs::metadata(&prefix_dir).unwrap().permissions().mode();
600 assert_eq!(prefix_mode & 0o7777, SHARED_DIRECTORY_MODE);
601
602 let stored = prefix_dir.join("c123def456789012345678901234ab");
603 let file_mode = fs::metadata(&stored).unwrap().permissions().mode();
604 assert_eq!(file_mode & 0o777, SHARED_BLOB_MODE);
605
606 backend.log_audit(&test_audit_entry()).unwrap();
607 let audit_path = storage.join("audit.log.jsonl");
608 let audit_mode = fs::metadata(&audit_path).unwrap().permissions().mode();
609 assert_eq!(audit_mode & 0o777, SHARED_AUDIT_LOG_MODE);
610 }
611}