xref: /wasmtime-44.0.1/crates/cache/src/worker.rs (revision 61a371ac)
1 //! Background worker that watches over the cache.
2 //!
3 //! It cleans up old cache, updates statistics and optimizes the cache.
4 //! We allow losing some messages (it doesn't hurt) and some races,
5 //! but we guarantee eventual consistency and fault tolerancy.
6 //! Background tasks can be CPU intensive, but the worker thread has low priority.
7 
8 #![cfg_attr(
9     not(test),
10     expect(
11         clippy::useless_conversion,
12         reason = "cfg(test) and cfg(not(test)) have a different definition \
13                   of `SystemTime`, so conversions below are needed in \
14                   one mode but not the other, just ignore the lint in this \
15                   module in not(test) mode where the conversion isn't required",
16     )
17 )]
18 
19 use super::{CacheConfig, fs_write_atomic};
20 use log::{debug, info, trace, warn};
21 use serde_derive::{Deserialize, Serialize};
22 use std::cmp;
23 use std::collections::HashMap;
24 use std::ffi::OsStr;
25 use std::fmt;
26 use std::fs;
27 use std::path::{Path, PathBuf};
28 use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
29 #[cfg(test)]
30 use std::sync::{Arc, Condvar, Mutex};
31 use std::thread;
32 use std::time::Duration;
33 #[cfg(not(test))]
34 use std::time::SystemTime;
35 #[cfg(test)]
36 use tests::system_time_stub::SystemTimeStub as SystemTime;
37 
38 #[derive(Clone)]
39 pub(super) struct Worker {
40     sender: SyncSender<CacheEvent>,
41     #[cfg(test)]
42     stats: Arc<(Mutex<WorkerStats>, Condvar)>,
43 }
44 
45 struct WorkerThread {
46     receiver: Receiver<CacheEvent>,
47     cache_config: CacheConfig,
48     #[cfg(test)]
49     stats: Arc<(Mutex<WorkerStats>, Condvar)>,
50 }
51 
52 #[cfg(test)]
53 #[derive(Default)]
54 struct WorkerStats {
55     dropped: u32,
56     sent: u32,
57     handled: u32,
58 }
59 
60 #[derive(Debug, Clone)]
61 enum CacheEvent {
62     OnCacheGet(PathBuf),
63     OnCacheUpdate(PathBuf),
64 }
65 
66 impl Worker {
start_new(cache_config: &CacheConfig) -> Self67     pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
68         let queue_size = match cache_config.worker_event_queue_size() {
69             num if num <= usize::max_value() as u64 => num as usize,
70             _ => usize::max_value(),
71         };
72         let (tx, rx) = sync_channel(queue_size);
73 
74         #[cfg(test)]
75         let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
76 
77         let worker_thread = WorkerThread {
78             receiver: rx,
79             cache_config: cache_config.clone(),
80             #[cfg(test)]
81             stats: stats.clone(),
82         };
83 
84         // when self is dropped, sender will be dropped, what will cause the channel
85         // to hang, and the worker thread to exit -- it happens in the tests
86         // non-tests binary has only a static worker, so Rust doesn't drop it
87         thread::spawn(move || worker_thread.run());
88 
89         Self {
90             sender: tx,
91             #[cfg(test)]
92             stats,
93         }
94     }
95 
on_cache_get_async(&self, path: impl AsRef<Path>)96     pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
97         let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
98         self.send_cache_event(event);
99     }
100 
on_cache_update_async(&self, path: impl AsRef<Path>)101     pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
102         let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
103         self.send_cache_event(event);
104     }
105 
106     #[inline]
send_cache_event(&self, event: CacheEvent)107     fn send_cache_event(&self, event: CacheEvent) {
108         let sent_event = self.sender.try_send(event.clone());
109 
110         if let Err(ref err) = sent_event {
111             info!(
112                 "Failed to send asynchronously message to worker thread, \
113                  event: {event:?}, error: {err}"
114             );
115         }
116 
117         #[cfg(test)]
118         {
119             let mut stats = self
120                 .stats
121                 .0
122                 .lock()
123                 .expect("Failed to acquire worker stats lock");
124 
125             if sent_event.is_ok() {
126                 stats.sent += 1;
127             } else {
128                 stats.dropped += 1;
129             }
130         }
131     }
132 
133     #[cfg(test)]
events_dropped(&self) -> u32134     pub(super) fn events_dropped(&self) -> u32 {
135         let stats = self
136             .stats
137             .0
138             .lock()
139             .expect("Failed to acquire worker stats lock");
140         stats.dropped
141     }
142 
143     #[cfg(test)]
wait_for_all_events_handled(&self)144     pub(super) fn wait_for_all_events_handled(&self) {
145         let (stats, condvar) = &*self.stats;
146         let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
147         while stats.handled != stats.sent {
148             stats = condvar
149                 .wait(stats)
150                 .expect("Failed to reacquire worker stats lock");
151         }
152     }
153 }
154 
155 impl fmt::Debug for Worker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result156     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157         f.debug_struct("Worker").finish()
158     }
159 }
160 
161 #[derive(Serialize, Deserialize)]
162 struct ModuleCacheStatistics {
163     pub usages: u64,
164     #[serde(rename = "optimized-compression")]
165     pub compression_level: i32,
166 }
167 
168 impl ModuleCacheStatistics {
default(cache_config: &CacheConfig) -> Self169     fn default(cache_config: &CacheConfig) -> Self {
170         Self {
171             usages: 0,
172             compression_level: cache_config.baseline_compression_level(),
173         }
174     }
175 }
176 
177 enum CacheEntry {
178     Recognized {
179         path: PathBuf,
180         mtime: SystemTime,
181         size: u64,
182     },
183     Unrecognized {
184         path: PathBuf,
185         is_dir: bool,
186     },
187 }
188 
189 macro_rules! unwrap_or_warn {
190     ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
191         match $result {
192             Ok(val) => val,
193             Err(err) => {
194                 warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
195                 $cont
196             }
197         }
198     };
199 }
200 
201 impl WorkerThread {
run(self)202     fn run(self) {
203         debug!("Cache worker thread started.");
204 
205         Self::lower_thread_priority();
206 
207         #[cfg(test)]
208         let (stats, condvar) = &*self.stats;
209 
210         for event in self.receiver.iter() {
211             match event {
212                 CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
213                 CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
214             }
215 
216             #[cfg(test)]
217             {
218                 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
219                 stats.handled += 1;
220                 condvar.notify_all();
221             }
222         }
223     }
224 
225     #[cfg(target_os = "fuchsia")]
lower_thread_priority()226     fn lower_thread_priority() {
227         // TODO This needs to use Fuchsia thread profiles
228         // https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
229         warn!(
230             "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
231         );
232     }
233 
234     #[cfg(target_os = "windows")]
lower_thread_priority()235     fn lower_thread_priority() {
236         use windows_sys::Win32::System::Threading::*;
237 
238         // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
239         // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
240 
241         if unsafe { SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN) } == 0 {
242             warn!(
243                 "Failed to lower worker thread priority. It might affect application performance."
244             );
245         }
246     }
247 
248     #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
lower_thread_priority()249     fn lower_thread_priority() {
250         // http://man7.org/linux/man-pages/man7/sched.7.html
251 
252         const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
253 
254         match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
255             Ok(current_nice) => {
256                 debug!("New nice value of worker thread: {current_nice}");
257             }
258             Err(err) => {
259                 warn!(
260                     "Failed to lower worker thread priority ({err:?}). It might affect application performance."
261                 );
262             }
263         };
264     }
265 
266     /// Increases the usage counter and recompresses the file
267     /// if the usage counter reached configurable threshold.
handle_on_cache_get(&self, path: PathBuf)268     fn handle_on_cache_get(&self, path: PathBuf) {
269         trace!("handle_on_cache_get() for path: {}", path.display());
270 
271         // construct .stats file path
272         let filename = path.file_name().unwrap().to_str().unwrap();
273         let stats_path = path.with_file_name(format!("{filename}.stats"));
274 
275         // load .stats file (default if none or error)
276         let mut stats = read_stats_file(stats_path.as_ref())
277             .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
278 
279         // step 1: update the usage counter & write to the disk
280         //         it's racy, but it's fine (the counter will be just smaller,
281         //         sometimes will retrigger recompression)
282         stats.usages += 1;
283         if !write_stats_file(stats_path.as_ref(), &stats) {
284             return;
285         }
286 
287         // step 2: recompress if there's a need
288         let opt_compr_lvl = self.cache_config.optimized_compression_level();
289         if stats.compression_level >= opt_compr_lvl
290             || stats.usages
291                 < self
292                     .cache_config
293                     .optimized_compression_usage_counter_threshold()
294         {
295             return;
296         }
297 
298         let lock_path = if let Some(p) = acquire_task_fs_lock(
299             path.as_ref(),
300             self.cache_config.optimizing_compression_task_timeout(),
301             self.cache_config
302                 .allowed_clock_drift_for_files_from_future(),
303         ) {
304             p
305         } else {
306             return;
307         };
308 
309         trace!("Trying to recompress file: {}", path.display());
310 
311         // recompress, write to other file, rename (it's atomic file content exchange)
312         // and update the stats file
313         let compressed_cache_bytes = unwrap_or_warn!(
314             fs::read(&path),
315             return,
316             "Failed to read old cache file",
317             path
318         );
319 
320         let cache_bytes = unwrap_or_warn!(
321             zstd::decode_all(&compressed_cache_bytes[..]),
322             return,
323             "Failed to decompress cached code",
324             path
325         );
326 
327         let recompressed_cache_bytes = unwrap_or_warn!(
328             zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
329             return,
330             "Failed to compress cached code",
331             path
332         );
333 
334         unwrap_or_warn!(
335             fs::write(&lock_path, &recompressed_cache_bytes),
336             return,
337             "Failed to write recompressed cache",
338             lock_path
339         );
340 
341         unwrap_or_warn!(
342             fs::rename(&lock_path, &path),
343             {
344                 if let Err(error) = fs::remove_file(&lock_path) {
345                     warn!(
346                         "Failed to clean up (remove) recompressed cache, path {}, err: {}",
347                         lock_path.display(),
348                         error
349                     );
350                 }
351 
352                 return;
353             },
354             "Failed to rename recompressed cache",
355             lock_path
356         );
357 
358         // update stats file (reload it! recompression can take some time)
359         if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
360             if new_stats.compression_level >= opt_compr_lvl {
361                 // Rare race:
362                 //    two instances with different opt_compr_lvl: we don't know in which order they updated
363                 //    the cache file and the stats file (they are not updated together atomically)
364                 // Possible solution is to use directories per cache entry, but it complicates the system
365                 // and is not worth it.
366                 debug!(
367                     "DETECTED task did more than once (or race with new file): \
368                      recompression of {}. Note: if optimized compression level setting \
369                      has changed in the meantine, the stats file might contain \
370                      inconsistent compression level due to race.",
371                     path.display()
372                 );
373             } else {
374                 new_stats.compression_level = opt_compr_lvl;
375                 let _ = write_stats_file(stats_path.as_ref(), &new_stats);
376             }
377 
378             if new_stats.usages < stats.usages {
379                 debug!(
380                     "DETECTED lower usage count (new file or race with counter \
381                      increasing): file {}",
382                     path.display()
383                 );
384             }
385         } else {
386             debug!(
387                 "Can't read stats file again to update compression level (it might got \
388                  cleaned up): file {}",
389                 stats_path.display()
390             );
391         }
392 
393         trace!("Task finished: recompress file: {}", path.display());
394     }
395 
directory(&self) -> &PathBuf396     fn directory(&self) -> &PathBuf {
397         self.cache_config
398             .directory()
399             .expect("CacheConfig should be validated before being passed to a WorkerThread")
400     }
401 
handle_on_cache_update(&self, path: PathBuf)402     fn handle_on_cache_update(&self, path: PathBuf) {
403         trace!("handle_on_cache_update() for path: {}", path.display());
404 
405         // ---------------------- step 1: create .stats file
406 
407         // construct .stats file path
408         let filename = path
409             .file_name()
410             .expect("Expected valid cache file name")
411             .to_str()
412             .expect("Expected valid cache file name");
413         let stats_path = path.with_file_name(format!("{filename}.stats"));
414 
415         // create and write stats file
416         let mut stats = ModuleCacheStatistics::default(&self.cache_config);
417         stats.usages += 1;
418         write_stats_file(&stats_path, &stats);
419 
420         // ---------------------- step 2: perform cleanup task if needed
421 
422         // acquire lock for cleanup task
423         // Lock is a proof of recent cleanup task, so we don't want to delete them.
424         // Expired locks will be deleted by the cleanup task.
425         let cleanup_file = self.directory().join(".cleanup"); // some non existing marker file
426         if acquire_task_fs_lock(
427             &cleanup_file,
428             self.cache_config.cleanup_interval(),
429             self.cache_config
430                 .allowed_clock_drift_for_files_from_future(),
431         )
432         .is_none()
433         {
434             return;
435         }
436 
437         trace!("Trying to clean up cache");
438 
439         let mut cache_index = self.list_cache_contents();
440         let future_tolerance = SystemTime::now()
441             .checked_add(
442                 self.cache_config
443                     .allowed_clock_drift_for_files_from_future(),
444             )
445             .expect("Brace your cache, the next Big Bang is coming (time overflow)");
446         cache_index.sort_unstable_by(|lhs, rhs| {
447             // sort by age
448             use CacheEntry::*;
449             match (lhs, rhs) {
450                 (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
451                     match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
452                         // later == younger
453                         (false, false) => rhs_mt.cmp(lhs_mt),
454                         // files from far future are treated as oldest recognized files
455                         // we want to delete them, so the cache keeps track of recent files
456                         // however, we don't delete them uncodintionally,
457                         // because .stats file can be overwritten with a meaningful mtime
458                         (true, false) => cmp::Ordering::Greater,
459                         (false, true) => cmp::Ordering::Less,
460                         (true, true) => cmp::Ordering::Equal,
461                     }
462                 }
463                 // unrecognized is kind of infinity
464                 (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
465                 (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
466                 (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
467             }
468         });
469 
470         // find "cut" boundary:
471         // - remove unrecognized files anyway,
472         // - remove some cache files if some quota has been exceeded
473         let mut total_size = 0u64;
474         let mut start_delete_idx = None;
475         let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
476 
477         let total_size_limit = self.cache_config.files_total_size_soft_limit();
478         let file_count_limit = self.cache_config.file_count_soft_limit();
479         let tsl_if_deleting = total_size_limit
480             .checked_mul(
481                 self.cache_config
482                     .files_total_size_limit_percent_if_deleting() as u64,
483             )
484             .unwrap()
485             / 100;
486         let fcl_if_deleting = file_count_limit
487             .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
488             .unwrap()
489             / 100;
490 
491         for (idx, item) in cache_index.iter().enumerate() {
492             let size = if let CacheEntry::Recognized { size, .. } = item {
493                 size
494             } else {
495                 start_delete_idx = Some(idx);
496                 break;
497             };
498 
499             total_size += size;
500             if start_delete_idx_if_deleting_recognized_items.is_none()
501                 && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
502             {
503                 start_delete_idx_if_deleting_recognized_items = Some(idx);
504             }
505 
506             if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
507                 start_delete_idx = start_delete_idx_if_deleting_recognized_items;
508                 break;
509             }
510         }
511 
512         if let Some(idx) = start_delete_idx {
513             for item in &cache_index[idx..] {
514                 let (result, path, entity) = match item {
515                     CacheEntry::Recognized { path, .. }
516                     | CacheEntry::Unrecognized {
517                         path,
518                         is_dir: false,
519                     } => (fs::remove_file(path), path, "file"),
520                     CacheEntry::Unrecognized { path, is_dir: true } => {
521                         (fs::remove_dir_all(path), path, "directory")
522                     }
523                 };
524                 if let Err(err) = result {
525                     warn!(
526                         "Failed to remove {} during cleanup, path: {}, err: {}",
527                         entity,
528                         path.display(),
529                         err
530                     );
531                 }
532             }
533         }
534 
535         trace!("Task finished: clean up cache");
536     }
537 
538     // Be fault tolerant: list as much as you can, and ignore the rest
list_cache_contents(&self) -> Vec<CacheEntry>539     fn list_cache_contents(&self) -> Vec<CacheEntry> {
540         fn enter_dir(
541             vec: &mut Vec<CacheEntry>,
542             dir_path: &Path,
543             level: u8,
544             cache_config: &CacheConfig,
545         ) {
546             macro_rules! add_unrecognized {
547                 (file: $path:expr) => {
548                     add_unrecognized!(false, $path)
549                 };
550                 (dir: $path:expr) => {
551                     add_unrecognized!(true, $path)
552                 };
553                 ($is_dir:expr, $path:expr) => {
554                     vec.push(CacheEntry::Unrecognized {
555                         path: $path.to_path_buf(),
556                         is_dir: $is_dir,
557                     })
558                 };
559             }
560             macro_rules! add_unrecognized_and {
561                 ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
562                     $( add_unrecognized!($ty: $path); )*
563                         $cont
564                 }};
565             }
566 
567             macro_rules! unwrap_or {
568                 ($result:expr, $cont:stmt, $err_msg:expr) => {
569                     unwrap_or!($result, $cont, $err_msg, dir_path)
570                 };
571                 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
572                     unwrap_or_warn!(
573                         $result,
574                         $cont,
575                         format!("{}, level: {}", $err_msg, level),
576                         $path
577                     )
578                 };
579             }
580 
581             // If we fail to list a directory, something bad is happening anyway
582             // (something touches our cache or we have disk failure)
583             // Try to delete it, so we can stay within soft limits of the cache size.
584             // This comment applies later in this function, too.
585             let it = unwrap_or!(
586                 fs::read_dir(dir_path),
587                 add_unrecognized_and!([dir: dir_path], return),
588                 "Failed to list cache directory, deleting it"
589             );
590 
591             let mut cache_files = HashMap::new();
592             for entry in it {
593                 // read_dir() returns an iterator over results - in case some of them are errors
594                 // we don't know their names, so we can't delete them. We don't want to delete
595                 // the whole directory with good entries too, so we just ignore the erroneous entries.
596                 let entry = unwrap_or!(
597                     entry,
598                     continue,
599                     "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
600                 );
601                 let path = entry.path();
602                 match (level, path.is_dir()) {
603                     (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
604                     (0..=1, false) => {
605                         if level == 0
606                             && path.file_stem() == Some(OsStr::new(".cleanup"))
607                                 && path.extension().is_some()
608                                 // assume it's cleanup lock
609                                 && !is_fs_lock_expired(
610                                     Some(&entry),
611                                     &path,
612                                     cache_config.cleanup_interval(),
613                                     cache_config.allowed_clock_drift_for_files_from_future(),
614                                 )
615                         {
616                             continue; // skip active lock
617                         }
618                         add_unrecognized!(file: path);
619                     }
620                     (2, false) => {
621                         match path.extension().and_then(OsStr::to_str) {
622                             // mod or stats file
623                             None | Some("stats") => {
624                                 cache_files.insert(path, entry);
625                             }
626 
627                             Some(ext) => {
628                                 // check if valid lock
629                                 let recognized = ext.starts_with("wip-")
630                                     && !is_fs_lock_expired(
631                                         Some(&entry),
632                                         &path,
633                                         cache_config.optimizing_compression_task_timeout(),
634                                         cache_config.allowed_clock_drift_for_files_from_future(),
635                                     );
636 
637                                 if !recognized {
638                                     add_unrecognized!(file: path);
639                                 }
640                             }
641                         }
642                     }
643                     (_, is_dir) => add_unrecognized!(is_dir, path),
644                 }
645             }
646 
647             // associate module with its stats & handle them
648             // assumption: just mods and stats
649             for (path, entry) in cache_files.iter() {
650                 let path_buf: PathBuf;
651                 let (mod_, stats_, is_mod) = match path.extension() {
652                     Some(_) => {
653                         path_buf = path.with_extension("");
654                         (
655                             cache_files.get(&path_buf).map(|v| (&path_buf, v)),
656                             Some((path, entry)),
657                             false,
658                         )
659                     }
660                     None => {
661                         path_buf = path.with_extension("stats");
662                         (
663                             Some((path, entry)),
664                             cache_files.get(&path_buf).map(|v| (&path_buf, v)),
665                             true,
666                         )
667                     }
668                 };
669 
670                 // construct a cache entry
671                 match (mod_, stats_, is_mod) {
672                     (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
673                         let mod_metadata = unwrap_or!(
674                             mod_entry.metadata(),
675                             add_unrecognized_and!([file: stats_path, file: mod_path], continue),
676                             "Failed to get metadata, deleting BOTH module cache and stats files",
677                             mod_path
678                         );
679                         let stats_mtime = unwrap_or!(
680                             stats_entry.metadata().and_then(|m| m.modified()),
681                             add_unrecognized_and!(
682                                 [file: stats_path],
683                                 unwrap_or!(
684                                     mod_metadata.modified(),
685                                     add_unrecognized_and!(
686                                         [file: stats_path, file: mod_path],
687                                         continue
688                                     ),
689                                     "Failed to get mtime, deleting BOTH module cache and stats \
690                                      files",
691                                     mod_path
692                                 )
693                             ),
694                             "Failed to get metadata/mtime, deleting the file",
695                             stats_path
696                         );
697                         // .into() called for the SystemTimeStub if cfg(test)
698                         vec.push(CacheEntry::Recognized {
699                             path: mod_path.to_path_buf(),
700                             mtime: stats_mtime.into(),
701                             size: mod_metadata.len(),
702                         })
703                     }
704                     (Some(_), Some(_), false) => (), // was or will be handled by previous branch
705                     (Some((mod_path, mod_entry)), None, _) => {
706                         let (mod_metadata, mod_mtime) = unwrap_or!(
707                             mod_entry
708                                 .metadata()
709                                 .and_then(|md| md.modified().map(|mt| (md, mt))),
710                             add_unrecognized_and!([file: mod_path], continue),
711                             "Failed to get metadata/mtime, deleting the file",
712                             mod_path
713                         );
714                         // .into() called for the SystemTimeStub if cfg(test)
715                         vec.push(CacheEntry::Recognized {
716                             path: mod_path.to_path_buf(),
717                             mtime: mod_mtime.into(),
718                             size: mod_metadata.len(),
719                         })
720                     }
721                     (None, Some((stats_path, _stats_entry)), _) => {
722                         debug!("Found orphaned stats file: {}", stats_path.display());
723                         add_unrecognized!(file: stats_path);
724                     }
725                     _ => unreachable!(),
726                 }
727             }
728         }
729 
730         let mut vec = Vec::new();
731         enter_dir(&mut vec, self.directory(), 0, &self.cache_config);
732         vec
733     }
734 }
735 
read_stats_file(path: &Path) -> Option<ModuleCacheStatistics>736 fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
737     fs::read_to_string(path)
738         .map_err(|err| {
739             trace!(
740                 "Failed to read stats file, path: {}, err: {}",
741                 path.display(),
742                 err
743             )
744         })
745         .and_then(|contents| {
746             toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
747                 trace!(
748                     "Failed to parse stats file, path: {}, err: {}",
749                     path.display(),
750                     err,
751                 )
752             })
753         })
754         .ok()
755 }
756 
write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool757 fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
758     toml::to_string_pretty(&stats)
759         .map_err(|err| {
760             warn!(
761                 "Failed to serialize stats file, path: {}, err: {}",
762                 path.display(),
763                 err
764             )
765         })
766         .and_then(|serialized| {
767             fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
768         })
769         .is_ok()
770 }
771 
772 /// Tries to acquire a lock for specific task.
773 ///
774 /// Returns Some(path) to the lock if succeeds. The task path must not
775 /// contain any extension and have file stem.
776 ///
777 /// To release a lock you need either manually rename or remove it,
778 /// or wait until it expires and cleanup task removes it.
779 ///
780 /// Note: this function is racy. Main idea is: be fault tolerant and
781 ///       never block some task. The price is that we rarely do some task
782 ///       more than once.
acquire_task_fs_lock( task_path: &Path, timeout: Duration, allowed_future_drift: Duration, ) -> Option<PathBuf>783 fn acquire_task_fs_lock(
784     task_path: &Path,
785     timeout: Duration,
786     allowed_future_drift: Duration,
787 ) -> Option<PathBuf> {
788     assert!(task_path.extension().is_none());
789     assert!(task_path.file_stem().is_some());
790 
791     // list directory
792     let dir_path = task_path.parent()?;
793     let it = fs::read_dir(dir_path)
794         .map_err(|err| {
795             warn!(
796                 "Failed to list cache directory, path: {}, err: {}",
797                 dir_path.display(),
798                 err
799             )
800         })
801         .ok()?;
802 
803     // look for existing locks
804     for entry in it {
805         let entry = entry
806             .map_err(|err| {
807                 warn!(
808                     "Failed to list cache directory, path: {}, err: {}",
809                     dir_path.display(),
810                     err
811                 )
812             })
813             .ok()?;
814 
815         let path = entry.path();
816         if path.is_dir() || path.file_stem() != task_path.file_stem() {
817             continue;
818         }
819 
820         // check extension and mtime
821         match path.extension() {
822             None => continue,
823             Some(ext) => {
824                 if let Some(ext_str) = ext.to_str() {
825                     // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
826                     if ext_str.starts_with("wip-")
827                         && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
828                     {
829                         return None;
830                     }
831                 }
832             }
833         }
834     }
835 
836     // create the lock
837     let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
838     let _file = fs::OpenOptions::new()
839         .create_new(true)
840         .write(true)
841         .open(&lock_path)
842         .map_err(|err| {
843             warn!(
844                 "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
845                 lock_path.display(),
846                 err
847             )
848         })
849         .ok()?;
850 
851     Some(lock_path)
852 }
853 
854 // we have either both, or just path; dir entry is desirable since on some platforms we can get
855 // metadata without extra syscalls
856 // furthermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
is_fs_lock_expired( entry: Option<&fs::DirEntry>, path: &PathBuf, threshold: Duration, allowed_future_drift: Duration, ) -> bool857 fn is_fs_lock_expired(
858     entry: Option<&fs::DirEntry>,
859     path: &PathBuf,
860     threshold: Duration,
861     allowed_future_drift: Duration,
862 ) -> bool {
863     let mtime = match entry
864         .map_or_else(|| path.metadata(), |e| e.metadata())
865         .and_then(|metadata| metadata.modified())
866     {
867         Ok(mt) => mt,
868         Err(err) => {
869             warn!(
870                 "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
871                 path.display(),
872                 err
873             );
874             return true; // can't read mtime, treat as expired, so this task will not be starved
875         }
876     };
877 
878     // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
879     match SystemTime::now().duration_since(mtime) {
880         Ok(elapsed) => elapsed >= threshold,
881         Err(err) => {
882             trace!(
883                 "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
884                 path.display(),
885                 err
886             );
887             // the lock is expired if the time is too far in the future
888             // it is fine to have network share and not synchronized clocks,
889             // but it's not good when user changes time in their system clock
890             err.duration() > allowed_future_drift
891         }
892     }
893 }
894 
895 #[cfg(test)]
896 mod tests;
897