Skip to main content
← dvs documentation Rust API reference

dvs/files/
get.rs

1use std::path::{Path, PathBuf};
2use std::sync::Mutex;
3
4use crate::cache::{HashCache, try_open_cache};
5use crate::files::metadata::FileMetadata;
6use crate::paths::GetPathStatus;
7use crate::progress::OnFileStart;
8use crate::utils::get_threadpool;
9use crate::{Backend, Compression, DvsPaths, Outcome, cache};
10use anyhow::{Context, Result, bail};
11use fs_err as fs;
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14
15fn get_file(
16    backend: &dyn Backend,
17    paths: &DvsPaths,
18    relative_path: impl AsRef<Path>,
19    cache: Option<&Mutex<HashCache>>,
20    dry_run: bool,
21    on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
22) -> Result<(Outcome, u64)> {
23    log::debug!("Retrieving file: {}", relative_path.as_ref().display());
24    let dvs_file_path = paths.metadata_path(relative_path.as_ref());
25    if !dvs_file_path.is_file() {
26        bail!(
27            "File {} is not tracked by DVS",
28            relative_path.as_ref().display()
29        );
30    }
31
32    let metadata: FileMetadata = serde_json::from_reader(fs::File::open(&dvs_file_path)?)?;
33    log::debug!(
34        "Read metadata for {}: {}",
35        relative_path.as_ref().display(),
36        metadata.hashes
37    );
38
39    if !backend.exists(&metadata.hashes)? {
40        bail!("Storage file missing for hash: {}", metadata.hashes);
41    }
42
43    let target_path = paths.file_path(relative_path.as_ref());
44    let rel_str = relative_path.as_ref().to_string_lossy();
45
46    // Check if target already exists and matches
47    if target_path.is_file() {
48        let (hashes, size) = cache::hashes_for_file(&target_path, &rel_str, cache)?;
49
50        if hashes == metadata.hashes && size == metadata.size {
51            log::debug!(
52                "File {} already present locally and matches",
53                relative_path.as_ref().display()
54            );
55            return Ok((Outcome::Present, metadata.size));
56        }
57    }
58
59    if dry_run {
60        return Ok((Outcome::Copied, metadata.size));
61    }
62
63    // Retrieve from backend to target path
64    log::debug!(
65        "Copying {} from storage to {}",
66        metadata.hashes,
67        target_path.display()
68    );
69
70    backend
71        .retrieve(
72            &metadata.hashes,
73            &target_path,
74            metadata.compression,
75            on_bytes,
76        )
77        .with_context(|| format!("Failed to retrieve {}", relative_path.as_ref().display()))?;
78    let actual = FileMetadata::from_file(&target_path, Compression::None, None)?;
79    if actual.hashes != metadata.hashes {
80        fs::remove_file(&target_path)?;
81        bail!("Retrieved file does not match expected hash");
82    }
83
84    // Store retrieved file's hashes in cache
85    if let Some(mtx) = cache {
86        if let Ok(stat) = cache::FileStat::from_path(&target_path) {
87            if let Err(e) = mtx.lock().unwrap().insert(&rel_str, &stat, &actual.hashes) {
88                log::warn!("Cache store failed after get for {rel_str}: {e}");
89            }
90        }
91    }
92
93    Ok((Outcome::Copied, metadata.size))
94}
95
96/// Result of getting a single file.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct GetResult {
99    pub path: PathBuf,
100    #[serde(flatten)]
101    pub detail: GetDetail,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(untagged)]
106pub enum GetDetail {
107    Success { outcome: Outcome, size: u64 },
108    Error { error: String },
109}
110
111/// Gets files matching a glob pattern from DVS storage.
112///
113/// The pattern is matched against tracked files (paths in metadata folder).
114/// The pattern is adjusted based on cwd relative to repo root.
115pub fn get_files(
116    files: Vec<PathBuf>,
117    paths: &DvsPaths,
118    backend: &dyn Backend,
119    dry_run: bool,
120    on_file_start: Option<&OnFileStart>,
121) -> Result<Vec<GetResult>> {
122    let matched_paths = paths.validate_for_get(&files);
123    if matched_paths.is_empty() {
124        return Ok(Vec::new());
125    }
126    let pool = get_threadpool(matched_paths.len())?;
127    let cache = try_open_cache(paths);
128
129    let mut results: Vec<GetResult> = pool.install(|| {
130        matched_paths
131            .into_par_iter()
132            .map(|(relative_path, validation)| {
133                match validation {
134                    GetPathStatus::NotFound => {
135                        return GetResult {
136                            path: relative_path,
137                            detail: GetDetail::Error {
138                                error: "file not found".to_string(),
139                            },
140                        };
141                    }
142                    GetPathStatus::NotTracked => {
143                        return GetResult {
144                            path: relative_path,
145                            detail: GetDetail::Error {
146                                error: "not tracked by DVS".to_string(),
147                            },
148                        };
149                    }
150                    GetPathStatus::Tracked => {}
151                }
152                let file_size = {
153                    let meta_path = paths.metadata_path(&relative_path);
154                    std::fs::File::open(&meta_path)
155                        .ok()
156                        .and_then(|f| serde_json::from_reader::<_, FileMetadata>(f).ok())
157                        .map(|m| m.size)
158                        .unwrap_or(0)
159                };
160                let file_progress = on_file_start.map(|f| f(&relative_path, file_size));
161                let on_bytes = file_progress.as_ref().map(|fp| &*fp.on_bytes);
162
163                match get_file(
164                    backend,
165                    paths,
166                    &relative_path,
167                    cache.as_ref(),
168                    dry_run,
169                    on_bytes,
170                ) {
171                    Ok((outcome, size)) => {
172                        log::info!(
173                            "Successfully retrieved {} ({:?})",
174                            relative_path.display(),
175                            outcome
176                        );
177                        GetResult {
178                            path: relative_path,
179                            detail: GetDetail::Success { outcome, size },
180                        }
181                    }
182                    Err(e) => {
183                        log::warn!("Failed to get {}: {e}", relative_path.display());
184                        GetResult {
185                            path: relative_path,
186                            detail: GetDetail::Error {
187                                error: e.to_string(),
188                            },
189                        }
190                    }
191                }
192            })
193            .collect()
194    });
195    results.sort_by(|a, b| a.path.cmp(&b.path));
196
197    Ok(results)
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::add_files;
204    use crate::files::add::AddDetail;
205    use crate::files::status::get_status;
206    use crate::testutil::{create_file, create_temp_git_repo, init_dvs_repo};
207    use uuid::Uuid;
208
209    fn make_paths(root: &Path, config: &crate::config::Config) -> DvsPaths {
210        DvsPaths::new(
211            root.to_path_buf(),
212            root.to_path_buf(),
213            config.metadata_folder_name(),
214        )
215        .unwrap()
216    }
217
218    fn make_cache(paths: &DvsPaths) -> Mutex<cache::HashCache> {
219        Mutex::new(cache::HashCache::open(&paths.cache_folder().join("dvs.db")).unwrap())
220    }
221
222    #[test]
223    fn get_file_retrieves_from_storage() {
224        let (_tmp, root) = create_temp_git_repo();
225        let (config, _dvs_dir) = init_dvs_repo(&root);
226        let backend = config.backend();
227        let paths = make_paths(&root, &config);
228        let file_path = create_file(&root, "retrieve.txt", b"stored content");
229
230        let metadata = FileMetadata::from_file(&file_path, Compression::Zstd, None).unwrap();
231        metadata
232            .save(
233                Uuid::new_v4(),
234                &file_path,
235                backend,
236                &paths,
237                "retrieve.txt",
238                None,
239            )
240            .unwrap();
241
242        // Delete the original file
243        fs::remove_file(&file_path).unwrap();
244        assert!(!file_path.exists());
245
246        // Retrieve it
247        let cache = make_cache(&paths);
248        let (outcome, _size) =
249            get_file(backend, &paths, "retrieve.txt", Some(&cache), false, None).unwrap();
250        assert_eq!(outcome, Outcome::Copied);
251        assert!(file_path.exists());
252        assert_eq!(fs::read(&file_path).unwrap(), b"stored content");
253    }
254
255    #[test]
256    fn get_file_returns_present_when_already_current() {
257        let (_tmp, root) = create_temp_git_repo();
258        let (config, _dvs_dir) = init_dvs_repo(&root);
259        let backend = config.backend();
260        let paths = make_paths(&root, &config);
261        let file_path = create_file(&root, "present.txt", b"content");
262
263        let metadata = FileMetadata::from_file(&file_path, Compression::Zstd, None).unwrap();
264        metadata
265            .save(
266                Uuid::new_v4(),
267                &file_path,
268                backend,
269                &paths,
270                "present.txt",
271                None,
272            )
273            .unwrap();
274
275        // File still exists and matches - should return Present
276        let cache = make_cache(&paths);
277        let (outcome, _size) =
278            get_file(backend, &paths, "present.txt", Some(&cache), false, None).unwrap();
279        assert_eq!(outcome, Outcome::Present);
280    }
281
282    #[test]
283    fn get_file_fails_for_untracked_file() {
284        let (_tmp, root) = create_temp_git_repo();
285        let (config, _dvs_dir) = init_dvs_repo(&root);
286        let backend = config.backend();
287        let paths = make_paths(&root, &config);
288
289        let cache = make_cache(&paths);
290        let result = get_file(backend, &paths, "untracked.txt", Some(&cache), false, None);
291        assert!(result.is_err());
292        assert!(result.unwrap_err().to_string().contains("not tracked"));
293    }
294
295    #[test]
296    fn get_files_reports_not_found_per_file() {
297        let (_tmp, root) = create_temp_git_repo();
298        let (config, _dvs_dir) = init_dvs_repo(&root);
299        let backend = config.backend();
300        let paths = make_paths(&root, &config);
301
302        create_file(&root, "a.txt", b"a");
303        add_files(
304            vec!["a.txt".into()],
305            &paths,
306            backend,
307            None,
308            Compression::Zstd,
309            false,
310            None,
311        )
312        .unwrap();
313
314        let results =
315            get_files(vec!["nonexistent.csv".into()], &paths, backend, false, None).unwrap();
316        assert_eq!(results.len(), 1);
317        assert!(
318            matches!(&results[0].detail, GetDetail::Error { error } if error.contains("not found"))
319        );
320    }
321
322    #[test]
323    fn get_files_reports_not_tracked_for_untracked_file() {
324        let (_tmp, root) = create_temp_git_repo();
325        let (config, _dvs_dir) = init_dvs_repo(&root);
326        let backend = config.backend();
327        let paths = make_paths(&root, &config);
328
329        // Create a file on disk but don't dvs add it
330        create_file(&root, "untracked.txt", b"hello");
331
332        let results =
333            get_files(vec!["untracked.txt".into()], &paths, backend, false, None).unwrap();
334        assert_eq!(results.len(), 1);
335        assert!(
336            matches!(&results[0].detail, GetDetail::Error { error } if error.contains("not tracked"))
337        );
338    }
339
340    fn run_add_get_roundtrip(file_paths: Vec<PathBuf>, expected_files: &[&str]) {
341        let (_tmp, root) = create_temp_git_repo();
342        let (config, _dvs_dir) = init_dvs_repo(&root);
343        let backend = config.backend();
344        let paths = make_paths(&root, &config);
345
346        create_file(&root, "a.txt", b"a");
347        create_file(&root, "b.txt", b"b");
348        create_file(&root, "c.csv", b"c");
349
350        // Add files
351        let results = add_files(
352            file_paths.clone(),
353            &paths,
354            backend,
355            None,
356            Compression::Zstd,
357            false,
358            None,
359        )
360        .unwrap();
361        assert_eq!(results.len(), expected_files.len());
362        for result in &results {
363            assert!(matches!(
364                result.detail,
365                AddDetail::Success {
366                    outcome: Outcome::Copied,
367                    ..
368                }
369            ));
370        }
371
372        // Verify correct files are tracked
373        let statuses = get_status(&paths, None).unwrap();
374        assert_eq!(statuses.len(), expected_files.len());
375        let tracked_names: Vec<_> = statuses.iter().map(|s| s.path.to_str().unwrap()).collect();
376        for expected in expected_files {
377            assert!(
378                tracked_names.contains(expected),
379                "Expected {expected} to be tracked"
380            );
381        }
382
383        // Delete tracked files
384        for name in expected_files {
385            fs::remove_file(root.join(name)).unwrap();
386        }
387
388        // Get files back
389        let results = get_files(file_paths, &paths, backend, false, None).unwrap();
390        assert_eq!(results.len(), expected_files.len());
391        for result in &results {
392            assert!(matches!(
393                result.detail,
394                GetDetail::Success {
395                    outcome: Outcome::Copied,
396                    size: _,
397                }
398            ));
399        }
400
401        // Verify files restored
402        for name in expected_files {
403            assert!(root.join(name).exists(), "Expected {name} to be restored");
404        }
405    }
406
407    #[test]
408    fn add_get_roundtrip_with_explicit_paths() {
409        let paths: Vec<PathBuf> = vec!["a.txt".into(), "c.csv".into()];
410        run_add_get_roundtrip(paths, &["a.txt", "c.csv"]);
411    }
412
413    #[test]
414    fn get_file_errors_on_corrupted_storage() {
415        let (_tmp, root) = create_temp_git_repo();
416        let (config, _dvs_dir) = init_dvs_repo(&root);
417        let backend = config.backend();
418        let paths = make_paths(&root, &config);
419
420        // Add a file
421        let file_path = create_file(&root, "data.txt", b"original content");
422        let metadata = FileMetadata::from_file(&file_path, Compression::Zstd, None).unwrap();
423        metadata
424            .save(
425                Uuid::new_v4(),
426                &file_path,
427                backend,
428                &paths,
429                "data.txt",
430                None,
431            )
432            .unwrap();
433
434        // Delete the local file
435        fs::remove_file(&file_path).unwrap();
436
437        // Corrupt the storage file (must remove read-only first)
438        let storage_path = root
439            .join(".storage")
440            .join(&metadata.hashes.blake3[..2])
441            .join(&metadata.hashes.blake3[2..]);
442        #[cfg(unix)]
443        {
444            use std::os::unix::fs::PermissionsExt;
445            let perms = std::fs::Permissions::from_mode(0o644);
446            fs::set_permissions(&storage_path, perms).unwrap();
447        }
448        #[cfg(not(unix))]
449        {
450            let mut perms = fs::metadata(&storage_path).unwrap().permissions();
451            perms.set_readonly(false);
452            fs::set_permissions(&storage_path, perms).unwrap();
453        }
454        fs::write(&storage_path, b"corrupted content").unwrap();
455
456        // get_file should error on decompression or hash mismatch
457        let cache = make_cache(&paths);
458        let result = get_file(backend, &paths, "data.txt", Some(&cache), false, None);
459        assert!(result.is_err());
460    }
461}