1use std::path::{Path, PathBuf};
2use std::sync::Mutex;
3
4use anyhow::Result;
5use rayon::prelude::*;
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9use crate::backends::Backend;
10use crate::cache::{HashCache, try_open_cache};
11use crate::config::Compression;
12use crate::files::metadata::FileMetadata;
13use crate::gitignore::add_to_gitignore;
14use crate::paths::{AddPathStatus, DvsPaths};
15use crate::progress::OnFileStart;
16use crate::utils::get_threadpool;
17use crate::{Outcome, cache};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct AddResult {
22 pub path: PathBuf,
23 #[serde(flatten)]
24 pub detail: AddDetail,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28#[serde(untagged)]
29pub enum AddDetail {
30 Success {
31 outcome: Outcome,
32 hash: String,
33 size: u64,
34 stored_size: u64,
35 },
36 Error {
37 error: String,
38 },
39}
40
41#[allow(clippy::too_many_arguments)]
42fn add_file(
43 relative_path: &Path,
44 paths: &DvsPaths,
45 backend: &dyn Backend,
46 cache: Option<&Mutex<HashCache>>,
47 operation_id: Uuid,
48 message: Option<String>,
49 compression: Compression,
50 dry_run: bool,
51 on_bytes: Option<&(dyn Fn(u64) + Send + Sync)>,
52) -> Result<(Outcome, FileMetadata, Option<u64>)> {
53 let full_path = paths.file_path(relative_path);
54 let rel_str = relative_path.to_string_lossy();
55 let (hashes, size) = cache::hashes_for_file(&full_path, &rel_str, cache)?;
56 let metadata = FileMetadata::from_hashes(hashes, size, compression, message);
57 if dry_run {
58 let dvs_file_path = paths.metadata_path(relative_path);
59 let dvs_file_exists = dvs_file_path.is_file();
60 let storage_exists = backend.exists(&metadata.hashes)?;
61 let outcome = if dvs_file_exists && storage_exists {
62 let existing: FileMetadata =
63 serde_json::from_reader(fs_err::File::open(&dvs_file_path)?)?;
64 if existing == metadata {
65 Outcome::Present
66 } else {
67 Outcome::Copied
68 }
69 } else {
70 Outcome::Copied
71 };
72 Ok((outcome, metadata, None))
73 } else {
74 let (outcome, stored_size) = metadata.save(
75 operation_id,
76 &full_path,
77 backend,
78 paths,
79 relative_path,
80 on_bytes,
81 )?;
82 Ok((outcome, metadata, stored_size))
83 }
84}
85
86pub fn add_files(
91 files: Vec<PathBuf>,
92 paths: &DvsPaths,
93 backend: &dyn Backend,
94 message: Option<String>,
95 compression: Compression,
96 dry_run: bool,
97 on_file_start: Option<&OnFileStart>,
98) -> Result<Vec<AddResult>> {
99 let matched_paths = paths.validate_for_add(&files);
100 if matched_paths.is_empty() {
101 return Ok(Vec::new());
102 }
103 let pool = get_threadpool(matched_paths.len())?;
104 let cache = try_open_cache(paths);
105 let operation_id = Uuid::new_v4();
106
107 let mut results: Vec<AddResult> = pool.install(|| {
108 matched_paths
109 .into_par_iter()
110 .map(|(relative_path, status)| {
111 match status {
112 AddPathStatus::NotFound => {
113 return AddResult {
114 path: relative_path,
115 detail: AddDetail::Error {
116 error: "file not found".to_string(),
117 },
118 };
119 }
120 AddPathStatus::OutsideProject => {
121 return AddResult {
122 path: relative_path,
123 detail: AddDetail::Error {
124 error: "path is outside project".to_string(),
125 },
126 };
127 }
128 AddPathStatus::IsDirectory => {
129 return AddResult {
130 path: relative_path,
131 detail: AddDetail::Error {
132 error: "path is a directory".to_string(),
133 },
134 };
135 }
136 AddPathStatus::Valid => {}
137 }
138
139 let full_path = paths.file_path(&relative_path);
140 match full_path.canonicalize() {
141 Ok(canonical) if !canonical.starts_with(paths.repo_root()) => {
142 return AddResult {
143 path: relative_path,
144 detail: AddDetail::Error {
145 error: "path is outside the dvs repository".to_string(),
146 },
147 };
148 }
149 Err(e) => {
150 return AddResult {
151 path: relative_path,
152 detail: AddDetail::Error {
153 error: format!("failed to resolve path: {e}"),
154 },
155 };
156 }
157 _ => {} }
159 let file_size = std::fs::metadata(&full_path).map(|m| m.len()).unwrap_or(0);
160 let file_progress = on_file_start.map(|f| f(&relative_path, file_size));
161 let on_bytes = file_progress.as_ref().map(|fp| &*fp.on_bytes);
162 match add_file(
163 &relative_path,
164 paths,
165 backend,
166 cache.as_ref(),
167 operation_id,
168 message.clone(),
169 compression,
170 dry_run,
171 on_bytes,
172 ) {
173 Ok((outcome, metadata, stored_size)) => {
174 log::info!(
175 "Successfully added {} ({:?})",
176 relative_path.display(),
177 outcome
178 );
179 let stored_size = stored_size.unwrap_or(metadata.size);
180 AddResult {
181 path: relative_path,
182 detail: AddDetail::Success {
183 outcome,
184 hash: metadata.hashes.blake3,
185 size: metadata.size,
186 stored_size,
187 },
188 }
189 }
190 Err(e) => {
191 log::warn!("Failed to add {}: {e}", relative_path.display());
192 AddResult {
193 path: relative_path,
194 detail: AddDetail::Error {
195 error: e.to_string(),
196 },
197 }
198 }
199 }
200 })
201 .collect()
202 });
203 results.sort_by(|a, b| a.path.cmp(&b.path));
204
205 let successful_paths: Vec<_> = results
206 .iter()
207 .filter(|r| matches!(r.detail, AddDetail::Success { .. }))
208 .map(|r| r.path.clone())
209 .collect();
210 if !dry_run && !successful_paths.is_empty() {
211 if let Err(e) = add_to_gitignore(paths.repo_root(), &successful_paths) {
212 log::warn!("Failed to update .gitignore: {e}");
213 }
214 }
215
216 Ok(results)
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use crate::testutil::{create_file, create_temp_git_repo, init_dvs_repo};
223 use std::fs;
224 use std::path::Path;
225
226 fn make_paths(root: &Path, config: &crate::config::Config) -> DvsPaths {
227 DvsPaths::new(
228 root.to_path_buf(),
229 root.to_path_buf(),
230 config.metadata_folder_name(),
231 )
232 .unwrap()
233 }
234
235 #[test]
236 fn add_files_reports_not_found_per_file() {
237 let (_tmp, root) = create_temp_git_repo();
238 let (config, _dvs_dir) = init_dvs_repo(&root);
239 let backend = config.backend();
240 let paths = make_paths(&root, &config);
241
242 create_file(&root, "a.txt", b"a");
243
244 let results = add_files(
245 vec!["nonexistent.csv".into()],
246 &paths,
247 backend,
248 None,
249 Compression::Zstd,
250 false,
251 None,
252 )
253 .unwrap();
254 assert_eq!(results.len(), 1);
255 assert!(
256 matches!(&results[0].detail, AddDetail::Error { error } if error.contains("not found"))
257 );
258 }
259
260 #[test]
261 fn add_files_mixed_statuses() {
262 let (_tmp, root) = create_temp_git_repo();
263 let (config, _dvs_dir) = init_dvs_repo(&root);
264 let backend = config.backend();
265 let paths = make_paths(&root, &config);
266
267 create_file(&root, "a.txt", b"a");
269
270 let outside_tmp = tempfile::tempdir().unwrap();
272 let outside_file = fs::canonicalize(outside_tmp.path())
273 .unwrap()
274 .join("outside.txt");
275 fs::write(&outside_file, b"outside").unwrap();
276 let outside_relative =
277 PathBuf::from("..").join(outside_file.strip_prefix(root.parent().unwrap()).unwrap());
278
279 fs::create_dir(root.join("subdir")).unwrap();
281
282 let results = add_files(
283 vec![
284 "a.txt".into(),
285 "missing.csv".into(),
286 outside_relative,
287 "subdir".into(),
288 ],
289 &paths,
290 backend,
291 None,
292 Compression::Zstd,
293 false,
294 None,
295 )
296 .unwrap();
297 assert_eq!(results.len(), 4);
298
299 let valid = results.iter().find(|r| *r.path == *"a.txt").unwrap();
300 assert!(matches!(
301 &valid.detail,
302 AddDetail::Success { outcome: Outcome::Copied, hash, size, .. }
303 if !hash.is_empty() && *size > 0
304 ));
305
306 let missing = results.iter().find(|r| *r.path == *"missing.csv").unwrap();
307 assert!(
308 matches!(&missing.detail, AddDetail::Error { error } if error.contains("not found"))
309 );
310
311 let outside = results.iter().find(|r| {
312 matches!(&r.detail, AddDetail::Error { error } if error.contains("outside project"))
313 });
314 assert!(outside.is_some());
315
316 let dir = results.iter().find(|r| *r.path == *"subdir").unwrap();
317 assert!(matches!(&dir.detail, AddDetail::Error { error } if error.contains("directory")));
318 }
319}