1use std::path::{Path, PathBuf};
2use std::sync::Mutex;
3
4use crate::cache::{HashCache, try_open_cache};
5use crate::files::metadata::FileMetadata;
6use crate::paths::GetPathStatus;
7use crate::progress::OnFileStart;
8use crate::utils::get_threadpool;
9use crate::{Backend, Compression, DvsPaths, Outcome, cache};
10use anyhow::{Context, Result, bail};
11use fs_err as fs;
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14
15fn get_file(
16 backend: &dyn Backend,
17 paths: &DvsPaths,
18 relative_path: impl AsRef<Path>,
19 cache: Option<&Mutex<HashCache>>,
20 dry_run: bool,
21 on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
22) -> Result<(Outcome, u64)> {
23 log::debug!("Retrieving file: {}", relative_path.as_ref().display());
24 let dvs_file_path = paths.metadata_path(relative_path.as_ref());
25 if !dvs_file_path.is_file() {
26 bail!(
27 "File {} is not tracked by DVS",
28 relative_path.as_ref().display()
29 );
30 }
31
32 let metadata: FileMetadata = serde_json::from_reader(fs::File::open(&dvs_file_path)?)?;
33 log::debug!(
34 "Read metadata for {}: {}",
35 relative_path.as_ref().display(),
36 metadata.hashes
37 );
38
39 if !backend.exists(&metadata.hashes)? {
40 bail!("Storage file missing for hash: {}", metadata.hashes);
41 }
42
43 let target_path = paths.file_path(relative_path.as_ref());
44 let rel_str = relative_path.as_ref().to_string_lossy();
45
46 if target_path.is_file() {
48 let (hashes, size) = cache::hashes_for_file(&target_path, &rel_str, cache)?;
49
50 if hashes == metadata.hashes && size == metadata.size {
51 log::debug!(
52 "File {} already present locally and matches",
53 relative_path.as_ref().display()
54 );
55 return Ok((Outcome::Present, metadata.size));
56 }
57 }
58
59 if dry_run {
60 return Ok((Outcome::Copied, metadata.size));
61 }
62
63 log::debug!(
65 "Copying {} from storage to {}",
66 metadata.hashes,
67 target_path.display()
68 );
69
70 backend
71 .retrieve(
72 &metadata.hashes,
73 &target_path,
74 metadata.compression,
75 on_bytes,
76 )
77 .with_context(|| format!("Failed to retrieve {}", relative_path.as_ref().display()))?;
78 let actual = FileMetadata::from_file(&target_path, Compression::None, None)?;
79 if actual.hashes != metadata.hashes {
80 fs::remove_file(&target_path)?;
81 bail!("Retrieved file does not match expected hash");
82 }
83
84 if let Some(mtx) = cache {
86 if let Ok(stat) = cache::FileStat::from_path(&target_path) {
87 if let Err(e) = mtx.lock().unwrap().insert(&rel_str, &stat, &actual.hashes) {
88 log::warn!("Cache store failed after get for {rel_str}: {e}");
89 }
90 }
91 }
92
93 Ok((Outcome::Copied, metadata.size))
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct GetResult {
99 pub path: PathBuf,
100 #[serde(flatten)]
101 pub detail: GetDetail,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(untagged)]
106pub enum GetDetail {
107 Success { outcome: Outcome, size: u64 },
108 Error { error: String },
109}
110
111pub fn get_files(
116 files: Vec<PathBuf>,
117 paths: &DvsPaths,
118 backend: &dyn Backend,
119 dry_run: bool,
120 on_file_start: Option<&OnFileStart>,
121) -> Result<Vec<GetResult>> {
122 let matched_paths = paths.validate_for_get(&files);
123 if matched_paths.is_empty() {
124 return Ok(Vec::new());
125 }
126 let pool = get_threadpool(matched_paths.len())?;
127 let cache = try_open_cache(paths);
128
129 let mut results: Vec<GetResult> = pool.install(|| {
130 matched_paths
131 .into_par_iter()
132 .map(|(relative_path, validation)| {
133 match validation {
134 GetPathStatus::NotFound => {
135 return GetResult {
136 path: relative_path,
137 detail: GetDetail::Error {
138 error: "file not found".to_string(),
139 },
140 };
141 }
142 GetPathStatus::NotTracked => {
143 return GetResult {
144 path: relative_path,
145 detail: GetDetail::Error {
146 error: "not tracked by DVS".to_string(),
147 },
148 };
149 }
150 GetPathStatus::Tracked => {}
151 }
152 let file_size = {
153 let meta_path = paths.metadata_path(&relative_path);
154 std::fs::File::open(&meta_path)
155 .ok()
156 .and_then(|f| serde_json::from_reader::<_, FileMetadata>(f).ok())
157 .map(|m| m.size)
158 .unwrap_or(0)
159 };
160 let file_progress = on_file_start.map(|f| f(&relative_path, file_size));
161 let on_bytes = file_progress.as_ref().map(|fp| &*fp.on_bytes);
162
163 match get_file(
164 backend,
165 paths,
166 &relative_path,
167 cache.as_ref(),
168 dry_run,
169 on_bytes,
170 ) {
171 Ok((outcome, size)) => {
172 log::info!(
173 "Successfully retrieved {} ({:?})",
174 relative_path.display(),
175 outcome
176 );
177 GetResult {
178 path: relative_path,
179 detail: GetDetail::Success { outcome, size },
180 }
181 }
182 Err(e) => {
183 log::warn!("Failed to get {}: {e}", relative_path.display());
184 GetResult {
185 path: relative_path,
186 detail: GetDetail::Error {
187 error: e.to_string(),
188 },
189 }
190 }
191 }
192 })
193 .collect()
194 });
195 results.sort_by(|a, b| a.path.cmp(&b.path));
196
197 Ok(results)
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::add_files;
204 use crate::files::add::AddDetail;
205 use crate::files::status::get_status;
206 use crate::testutil::{create_file, create_temp_git_repo, init_dvs_repo};
207 use uuid::Uuid;
208
209 fn make_paths(root: &Path, config: &crate::config::Config) -> DvsPaths {
210 DvsPaths::new(
211 root.to_path_buf(),
212 root.to_path_buf(),
213 config.metadata_folder_name(),
214 )
215 .unwrap()
216 }
217
218 fn make_cache(paths: &DvsPaths) -> Mutex<cache::HashCache> {
219 Mutex::new(cache::HashCache::open(&paths.cache_folder().join("dvs.db")).unwrap())
220 }
221
222 #[test]
223 fn get_file_retrieves_from_storage() {
224 let (_tmp, root) = create_temp_git_repo();
225 let (config, _dvs_dir) = init_dvs_repo(&root);
226 let backend = config.backend();
227 let paths = make_paths(&root, &config);
228 let file_path = create_file(&root, "retrieve.txt", b"stored content");
229
230 let metadata = FileMetadata::from_file(&file_path, Compression::Zstd, None).unwrap();
231 metadata
232 .save(
233 Uuid::new_v4(),
234 &file_path,
235 backend,
236 &paths,
237 "retrieve.txt",
238 None,
239 )
240 .unwrap();
241
242 fs::remove_file(&file_path).unwrap();
244 assert!(!file_path.exists());
245
246 let cache = make_cache(&paths);
248 let (outcome, _size) =
249 get_file(backend, &paths, "retrieve.txt", Some(&cache), false, None).unwrap();
250 assert_eq!(outcome, Outcome::Copied);
251 assert!(file_path.exists());
252 assert_eq!(fs::read(&file_path).unwrap(), b"stored content");
253 }
254
255 #[test]
256 fn get_file_returns_present_when_already_current() {
257 let (_tmp, root) = create_temp_git_repo();
258 let (config, _dvs_dir) = init_dvs_repo(&root);
259 let backend = config.backend();
260 let paths = make_paths(&root, &config);
261 let file_path = create_file(&root, "present.txt", b"content");
262
263 let metadata = FileMetadata::from_file(&file_path, Compression::Zstd, None).unwrap();
264 metadata
265 .save(
266 Uuid::new_v4(),
267 &file_path,
268 backend,
269 &paths,
270 "present.txt",
271 None,
272 )
273 .unwrap();
274
275 let cache = make_cache(&paths);
277 let (outcome, _size) =
278 get_file(backend, &paths, "present.txt", Some(&cache), false, None).unwrap();
279 assert_eq!(outcome, Outcome::Present);
280 }
281
282 #[test]
283 fn get_file_fails_for_untracked_file() {
284 let (_tmp, root) = create_temp_git_repo();
285 let (config, _dvs_dir) = init_dvs_repo(&root);
286 let backend = config.backend();
287 let paths = make_paths(&root, &config);
288
289 let cache = make_cache(&paths);
290 let result = get_file(backend, &paths, "untracked.txt", Some(&cache), false, None);
291 assert!(result.is_err());
292 assert!(result.unwrap_err().to_string().contains("not tracked"));
293 }
294
295 #[test]
296 fn get_files_reports_not_found_per_file() {
297 let (_tmp, root) = create_temp_git_repo();
298 let (config, _dvs_dir) = init_dvs_repo(&root);
299 let backend = config.backend();
300 let paths = make_paths(&root, &config);
301
302 create_file(&root, "a.txt", b"a");
303 add_files(
304 vec!["a.txt".into()],
305 &paths,
306 backend,
307 None,
308 Compression::Zstd,
309 false,
310 None,
311 )
312 .unwrap();
313
314 let results =
315 get_files(vec!["nonexistent.csv".into()], &paths, backend, false, None).unwrap();
316 assert_eq!(results.len(), 1);
317 assert!(
318 matches!(&results[0].detail, GetDetail::Error { error } if error.contains("not found"))
319 );
320 }
321
322 #[test]
323 fn get_files_reports_not_tracked_for_untracked_file() {
324 let (_tmp, root) = create_temp_git_repo();
325 let (config, _dvs_dir) = init_dvs_repo(&root);
326 let backend = config.backend();
327 let paths = make_paths(&root, &config);
328
329 create_file(&root, "untracked.txt", b"hello");
331
332 let results =
333 get_files(vec!["untracked.txt".into()], &paths, backend, false, None).unwrap();
334 assert_eq!(results.len(), 1);
335 assert!(
336 matches!(&results[0].detail, GetDetail::Error { error } if error.contains("not tracked"))
337 );
338 }
339
340 fn run_add_get_roundtrip(file_paths: Vec<PathBuf>, expected_files: &[&str]) {
341 let (_tmp, root) = create_temp_git_repo();
342 let (config, _dvs_dir) = init_dvs_repo(&root);
343 let backend = config.backend();
344 let paths = make_paths(&root, &config);
345
346 create_file(&root, "a.txt", b"a");
347 create_file(&root, "b.txt", b"b");
348 create_file(&root, "c.csv", b"c");
349
350 let results = add_files(
352 file_paths.clone(),
353 &paths,
354 backend,
355 None,
356 Compression::Zstd,
357 false,
358 None,
359 )
360 .unwrap();
361 assert_eq!(results.len(), expected_files.len());
362 for result in &results {
363 assert!(matches!(
364 result.detail,
365 AddDetail::Success {
366 outcome: Outcome::Copied,
367 ..
368 }
369 ));
370 }
371
372 let statuses = get_status(&paths, None).unwrap();
374 assert_eq!(statuses.len(), expected_files.len());
375 let tracked_names: Vec<_> = statuses.iter().map(|s| s.path.to_str().unwrap()).collect();
376 for expected in expected_files {
377 assert!(
378 tracked_names.contains(expected),
379 "Expected {expected} to be tracked"
380 );
381 }
382
383 for name in expected_files {
385 fs::remove_file(root.join(name)).unwrap();
386 }
387
388 let results = get_files(file_paths, &paths, backend, false, None).unwrap();
390 assert_eq!(results.len(), expected_files.len());
391 for result in &results {
392 assert!(matches!(
393 result.detail,
394 GetDetail::Success {
395 outcome: Outcome::Copied,
396 size: _,
397 }
398 ));
399 }
400
401 for name in expected_files {
403 assert!(root.join(name).exists(), "Expected {name} to be restored");
404 }
405 }
406
407 #[test]
408 fn add_get_roundtrip_with_explicit_paths() {
409 let paths: Vec<PathBuf> = vec!["a.txt".into(), "c.csv".into()];
410 run_add_get_roundtrip(paths, &["a.txt", "c.csv"]);
411 }
412
413 #[test]
414 fn get_file_errors_on_corrupted_storage() {
415 let (_tmp, root) = create_temp_git_repo();
416 let (config, _dvs_dir) = init_dvs_repo(&root);
417 let backend = config.backend();
418 let paths = make_paths(&root, &config);
419
420 let file_path = create_file(&root, "data.txt", b"original content");
422 let metadata = FileMetadata::from_file(&file_path, Compression::Zstd, None).unwrap();
423 metadata
424 .save(
425 Uuid::new_v4(),
426 &file_path,
427 backend,
428 &paths,
429 "data.txt",
430 None,
431 )
432 .unwrap();
433
434 fs::remove_file(&file_path).unwrap();
436
437 let storage_path = root
439 .join(".storage")
440 .join(&metadata.hashes.blake3[..2])
441 .join(&metadata.hashes.blake3[2..]);
442 #[cfg(unix)]
443 {
444 use std::os::unix::fs::PermissionsExt;
445 let perms = std::fs::Permissions::from_mode(0o644);
446 fs::set_permissions(&storage_path, perms).unwrap();
447 }
448 #[cfg(not(unix))]
449 {
450 let mut perms = fs::metadata(&storage_path).unwrap().permissions();
451 perms.set_readonly(false);
452 fs::set_permissions(&storage_path, perms).unwrap();
453 }
454 fs::write(&storage_path, b"corrupted content").unwrap();
455
456 let cache = make_cache(&paths);
458 let result = get_file(backend, &paths, "data.txt", Some(&cache), false, None);
459 assert!(result.is_err());
460 }
461}