1 //! Background worker that watches over the cache.
2 //!
3 //! It cleans up old cache, updates statistics and optimizes the cache.
4 //! We allow losing some messages (it doesn't hurt) and some races,
5 //! but we guarantee eventual consistency and fault tolerancy.
6 //! Background tasks can be CPU intensive, but the worker thread has low priority.
7
8 #![cfg_attr(
9 not(test),
10 expect(
11 clippy::useless_conversion,
12 reason = "cfg(test) and cfg(not(test)) have a different definition \
13 of `SystemTime`, so conversions below are needed in \
14 one mode but not the other, just ignore the lint in this \
15 module in not(test) mode where the conversion isn't required",
16 )
17 )]
18
19 use super::{CacheConfig, fs_write_atomic};
20 use log::{debug, info, trace, warn};
21 use serde_derive::{Deserialize, Serialize};
22 use std::cmp;
23 use std::collections::HashMap;
24 use std::ffi::OsStr;
25 use std::fmt;
26 use std::fs;
27 use std::path::{Path, PathBuf};
28 use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
29 #[cfg(test)]
30 use std::sync::{Arc, Condvar, Mutex};
31 use std::thread;
32 use std::time::Duration;
33 #[cfg(not(test))]
34 use std::time::SystemTime;
35 #[cfg(test)]
36 use tests::system_time_stub::SystemTimeStub as SystemTime;
37
38 #[derive(Clone)]
39 pub(super) struct Worker {
40 sender: SyncSender<CacheEvent>,
41 #[cfg(test)]
42 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
43 }
44
45 struct WorkerThread {
46 receiver: Receiver<CacheEvent>,
47 cache_config: CacheConfig,
48 #[cfg(test)]
49 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
50 }
51
52 #[cfg(test)]
53 #[derive(Default)]
54 struct WorkerStats {
55 dropped: u32,
56 sent: u32,
57 handled: u32,
58 }
59
60 #[derive(Debug, Clone)]
61 enum CacheEvent {
62 OnCacheGet(PathBuf),
63 OnCacheUpdate(PathBuf),
64 }
65
66 impl Worker {
start_new(cache_config: &CacheConfig) -> Self67 pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
68 let queue_size = match cache_config.worker_event_queue_size() {
69 num if num <= usize::max_value() as u64 => num as usize,
70 _ => usize::max_value(),
71 };
72 let (tx, rx) = sync_channel(queue_size);
73
74 #[cfg(test)]
75 let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
76
77 let worker_thread = WorkerThread {
78 receiver: rx,
79 cache_config: cache_config.clone(),
80 #[cfg(test)]
81 stats: stats.clone(),
82 };
83
84 // when self is dropped, sender will be dropped, what will cause the channel
85 // to hang, and the worker thread to exit -- it happens in the tests
86 // non-tests binary has only a static worker, so Rust doesn't drop it
87 thread::spawn(move || worker_thread.run());
88
89 Self {
90 sender: tx,
91 #[cfg(test)]
92 stats,
93 }
94 }
95
on_cache_get_async(&self, path: impl AsRef<Path>)96 pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
97 let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
98 self.send_cache_event(event);
99 }
100
on_cache_update_async(&self, path: impl AsRef<Path>)101 pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
102 let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
103 self.send_cache_event(event);
104 }
105
106 #[inline]
send_cache_event(&self, event: CacheEvent)107 fn send_cache_event(&self, event: CacheEvent) {
108 let sent_event = self.sender.try_send(event.clone());
109
110 if let Err(ref err) = sent_event {
111 info!(
112 "Failed to send asynchronously message to worker thread, \
113 event: {event:?}, error: {err}"
114 );
115 }
116
117 #[cfg(test)]
118 {
119 let mut stats = self
120 .stats
121 .0
122 .lock()
123 .expect("Failed to acquire worker stats lock");
124
125 if sent_event.is_ok() {
126 stats.sent += 1;
127 } else {
128 stats.dropped += 1;
129 }
130 }
131 }
132
133 #[cfg(test)]
events_dropped(&self) -> u32134 pub(super) fn events_dropped(&self) -> u32 {
135 let stats = self
136 .stats
137 .0
138 .lock()
139 .expect("Failed to acquire worker stats lock");
140 stats.dropped
141 }
142
143 #[cfg(test)]
wait_for_all_events_handled(&self)144 pub(super) fn wait_for_all_events_handled(&self) {
145 let (stats, condvar) = &*self.stats;
146 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
147 while stats.handled != stats.sent {
148 stats = condvar
149 .wait(stats)
150 .expect("Failed to reacquire worker stats lock");
151 }
152 }
153 }
154
155 impl fmt::Debug for Worker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("Worker").finish()
158 }
159 }
160
161 #[derive(Serialize, Deserialize)]
162 struct ModuleCacheStatistics {
163 pub usages: u64,
164 #[serde(rename = "optimized-compression")]
165 pub compression_level: i32,
166 }
167
168 impl ModuleCacheStatistics {
default(cache_config: &CacheConfig) -> Self169 fn default(cache_config: &CacheConfig) -> Self {
170 Self {
171 usages: 0,
172 compression_level: cache_config.baseline_compression_level(),
173 }
174 }
175 }
176
177 enum CacheEntry {
178 Recognized {
179 path: PathBuf,
180 mtime: SystemTime,
181 size: u64,
182 },
183 Unrecognized {
184 path: PathBuf,
185 is_dir: bool,
186 },
187 }
188
189 macro_rules! unwrap_or_warn {
190 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
191 match $result {
192 Ok(val) => val,
193 Err(err) => {
194 warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
195 $cont
196 }
197 }
198 };
199 }
200
201 impl WorkerThread {
run(self)202 fn run(self) {
203 debug!("Cache worker thread started.");
204
205 Self::lower_thread_priority();
206
207 #[cfg(test)]
208 let (stats, condvar) = &*self.stats;
209
210 for event in self.receiver.iter() {
211 match event {
212 CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
213 CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
214 }
215
216 #[cfg(test)]
217 {
218 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
219 stats.handled += 1;
220 condvar.notify_all();
221 }
222 }
223 }
224
225 #[cfg(target_os = "fuchsia")]
lower_thread_priority()226 fn lower_thread_priority() {
227 // TODO This needs to use Fuchsia thread profiles
228 // https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
229 warn!(
230 "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
231 );
232 }
233
234 #[cfg(target_os = "windows")]
lower_thread_priority()235 fn lower_thread_priority() {
236 use windows_sys::Win32::System::Threading::*;
237
238 // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
239 // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
240
241 if unsafe { SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN) } == 0 {
242 warn!(
243 "Failed to lower worker thread priority. It might affect application performance."
244 );
245 }
246 }
247
248 #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
lower_thread_priority()249 fn lower_thread_priority() {
250 // http://man7.org/linux/man-pages/man7/sched.7.html
251
252 const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
253
254 match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
255 Ok(current_nice) => {
256 debug!("New nice value of worker thread: {current_nice}");
257 }
258 Err(err) => {
259 warn!(
260 "Failed to lower worker thread priority ({err:?}). It might affect application performance."
261 );
262 }
263 };
264 }
265
266 /// Increases the usage counter and recompresses the file
267 /// if the usage counter reached configurable threshold.
handle_on_cache_get(&self, path: PathBuf)268 fn handle_on_cache_get(&self, path: PathBuf) {
269 trace!("handle_on_cache_get() for path: {}", path.display());
270
271 // construct .stats file path
272 let filename = path.file_name().unwrap().to_str().unwrap();
273 let stats_path = path.with_file_name(format!("{filename}.stats"));
274
275 // load .stats file (default if none or error)
276 let mut stats = read_stats_file(stats_path.as_ref())
277 .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
278
279 // step 1: update the usage counter & write to the disk
280 // it's racy, but it's fine (the counter will be just smaller,
281 // sometimes will retrigger recompression)
282 stats.usages += 1;
283 if !write_stats_file(stats_path.as_ref(), &stats) {
284 return;
285 }
286
287 // step 2: recompress if there's a need
288 let opt_compr_lvl = self.cache_config.optimized_compression_level();
289 if stats.compression_level >= opt_compr_lvl
290 || stats.usages
291 < self
292 .cache_config
293 .optimized_compression_usage_counter_threshold()
294 {
295 return;
296 }
297
298 let lock_path = if let Some(p) = acquire_task_fs_lock(
299 path.as_ref(),
300 self.cache_config.optimizing_compression_task_timeout(),
301 self.cache_config
302 .allowed_clock_drift_for_files_from_future(),
303 ) {
304 p
305 } else {
306 return;
307 };
308
309 trace!("Trying to recompress file: {}", path.display());
310
311 // recompress, write to other file, rename (it's atomic file content exchange)
312 // and update the stats file
313 let compressed_cache_bytes = unwrap_or_warn!(
314 fs::read(&path),
315 return,
316 "Failed to read old cache file",
317 path
318 );
319
320 let cache_bytes = unwrap_or_warn!(
321 zstd::decode_all(&compressed_cache_bytes[..]),
322 return,
323 "Failed to decompress cached code",
324 path
325 );
326
327 let recompressed_cache_bytes = unwrap_or_warn!(
328 zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
329 return,
330 "Failed to compress cached code",
331 path
332 );
333
334 unwrap_or_warn!(
335 fs::write(&lock_path, &recompressed_cache_bytes),
336 return,
337 "Failed to write recompressed cache",
338 lock_path
339 );
340
341 unwrap_or_warn!(
342 fs::rename(&lock_path, &path),
343 {
344 if let Err(error) = fs::remove_file(&lock_path) {
345 warn!(
346 "Failed to clean up (remove) recompressed cache, path {}, err: {}",
347 lock_path.display(),
348 error
349 );
350 }
351
352 return;
353 },
354 "Failed to rename recompressed cache",
355 lock_path
356 );
357
358 // update stats file (reload it! recompression can take some time)
359 if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
360 if new_stats.compression_level >= opt_compr_lvl {
361 // Rare race:
362 // two instances with different opt_compr_lvl: we don't know in which order they updated
363 // the cache file and the stats file (they are not updated together atomically)
364 // Possible solution is to use directories per cache entry, but it complicates the system
365 // and is not worth it.
366 debug!(
367 "DETECTED task did more than once (or race with new file): \
368 recompression of {}. Note: if optimized compression level setting \
369 has changed in the meantine, the stats file might contain \
370 inconsistent compression level due to race.",
371 path.display()
372 );
373 } else {
374 new_stats.compression_level = opt_compr_lvl;
375 let _ = write_stats_file(stats_path.as_ref(), &new_stats);
376 }
377
378 if new_stats.usages < stats.usages {
379 debug!(
380 "DETECTED lower usage count (new file or race with counter \
381 increasing): file {}",
382 path.display()
383 );
384 }
385 } else {
386 debug!(
387 "Can't read stats file again to update compression level (it might got \
388 cleaned up): file {}",
389 stats_path.display()
390 );
391 }
392
393 trace!("Task finished: recompress file: {}", path.display());
394 }
395
directory(&self) -> &PathBuf396 fn directory(&self) -> &PathBuf {
397 self.cache_config
398 .directory()
399 .expect("CacheConfig should be validated before being passed to a WorkerThread")
400 }
401
handle_on_cache_update(&self, path: PathBuf)402 fn handle_on_cache_update(&self, path: PathBuf) {
403 trace!("handle_on_cache_update() for path: {}", path.display());
404
405 // ---------------------- step 1: create .stats file
406
407 // construct .stats file path
408 let filename = path
409 .file_name()
410 .expect("Expected valid cache file name")
411 .to_str()
412 .expect("Expected valid cache file name");
413 let stats_path = path.with_file_name(format!("{filename}.stats"));
414
415 // create and write stats file
416 let mut stats = ModuleCacheStatistics::default(&self.cache_config);
417 stats.usages += 1;
418 write_stats_file(&stats_path, &stats);
419
420 // ---------------------- step 2: perform cleanup task if needed
421
422 // acquire lock for cleanup task
423 // Lock is a proof of recent cleanup task, so we don't want to delete them.
424 // Expired locks will be deleted by the cleanup task.
425 let cleanup_file = self.directory().join(".cleanup"); // some non existing marker file
426 if acquire_task_fs_lock(
427 &cleanup_file,
428 self.cache_config.cleanup_interval(),
429 self.cache_config
430 .allowed_clock_drift_for_files_from_future(),
431 )
432 .is_none()
433 {
434 return;
435 }
436
437 trace!("Trying to clean up cache");
438
439 let mut cache_index = self.list_cache_contents();
440 let future_tolerance = SystemTime::now()
441 .checked_add(
442 self.cache_config
443 .allowed_clock_drift_for_files_from_future(),
444 )
445 .expect("Brace your cache, the next Big Bang is coming (time overflow)");
446 cache_index.sort_unstable_by(|lhs, rhs| {
447 // sort by age
448 use CacheEntry::*;
449 match (lhs, rhs) {
450 (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
451 match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
452 // later == younger
453 (false, false) => rhs_mt.cmp(lhs_mt),
454 // files from far future are treated as oldest recognized files
455 // we want to delete them, so the cache keeps track of recent files
456 // however, we don't delete them uncodintionally,
457 // because .stats file can be overwritten with a meaningful mtime
458 (true, false) => cmp::Ordering::Greater,
459 (false, true) => cmp::Ordering::Less,
460 (true, true) => cmp::Ordering::Equal,
461 }
462 }
463 // unrecognized is kind of infinity
464 (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
465 (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
466 (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
467 }
468 });
469
470 // find "cut" boundary:
471 // - remove unrecognized files anyway,
472 // - remove some cache files if some quota has been exceeded
473 let mut total_size = 0u64;
474 let mut start_delete_idx = None;
475 let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
476
477 let total_size_limit = self.cache_config.files_total_size_soft_limit();
478 let file_count_limit = self.cache_config.file_count_soft_limit();
479 let tsl_if_deleting = total_size_limit
480 .checked_mul(
481 self.cache_config
482 .files_total_size_limit_percent_if_deleting() as u64,
483 )
484 .unwrap()
485 / 100;
486 let fcl_if_deleting = file_count_limit
487 .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
488 .unwrap()
489 / 100;
490
491 for (idx, item) in cache_index.iter().enumerate() {
492 let size = if let CacheEntry::Recognized { size, .. } = item {
493 size
494 } else {
495 start_delete_idx = Some(idx);
496 break;
497 };
498
499 total_size += size;
500 if start_delete_idx_if_deleting_recognized_items.is_none()
501 && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
502 {
503 start_delete_idx_if_deleting_recognized_items = Some(idx);
504 }
505
506 if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
507 start_delete_idx = start_delete_idx_if_deleting_recognized_items;
508 break;
509 }
510 }
511
512 if let Some(idx) = start_delete_idx {
513 for item in &cache_index[idx..] {
514 let (result, path, entity) = match item {
515 CacheEntry::Recognized { path, .. }
516 | CacheEntry::Unrecognized {
517 path,
518 is_dir: false,
519 } => (fs::remove_file(path), path, "file"),
520 CacheEntry::Unrecognized { path, is_dir: true } => {
521 (fs::remove_dir_all(path), path, "directory")
522 }
523 };
524 if let Err(err) = result {
525 warn!(
526 "Failed to remove {} during cleanup, path: {}, err: {}",
527 entity,
528 path.display(),
529 err
530 );
531 }
532 }
533 }
534
535 trace!("Task finished: clean up cache");
536 }
537
538 // Be fault tolerant: list as much as you can, and ignore the rest
list_cache_contents(&self) -> Vec<CacheEntry>539 fn list_cache_contents(&self) -> Vec<CacheEntry> {
540 fn enter_dir(
541 vec: &mut Vec<CacheEntry>,
542 dir_path: &Path,
543 level: u8,
544 cache_config: &CacheConfig,
545 ) {
546 macro_rules! add_unrecognized {
547 (file: $path:expr) => {
548 add_unrecognized!(false, $path)
549 };
550 (dir: $path:expr) => {
551 add_unrecognized!(true, $path)
552 };
553 ($is_dir:expr, $path:expr) => {
554 vec.push(CacheEntry::Unrecognized {
555 path: $path.to_path_buf(),
556 is_dir: $is_dir,
557 })
558 };
559 }
560 macro_rules! add_unrecognized_and {
561 ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
562 $( add_unrecognized!($ty: $path); )*
563 $cont
564 }};
565 }
566
567 macro_rules! unwrap_or {
568 ($result:expr, $cont:stmt, $err_msg:expr) => {
569 unwrap_or!($result, $cont, $err_msg, dir_path)
570 };
571 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
572 unwrap_or_warn!(
573 $result,
574 $cont,
575 format!("{}, level: {}", $err_msg, level),
576 $path
577 )
578 };
579 }
580
581 // If we fail to list a directory, something bad is happening anyway
582 // (something touches our cache or we have disk failure)
583 // Try to delete it, so we can stay within soft limits of the cache size.
584 // This comment applies later in this function, too.
585 let it = unwrap_or!(
586 fs::read_dir(dir_path),
587 add_unrecognized_and!([dir: dir_path], return),
588 "Failed to list cache directory, deleting it"
589 );
590
591 let mut cache_files = HashMap::new();
592 for entry in it {
593 // read_dir() returns an iterator over results - in case some of them are errors
594 // we don't know their names, so we can't delete them. We don't want to delete
595 // the whole directory with good entries too, so we just ignore the erroneous entries.
596 let entry = unwrap_or!(
597 entry,
598 continue,
599 "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
600 );
601 let path = entry.path();
602 match (level, path.is_dir()) {
603 (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
604 (0..=1, false) => {
605 if level == 0
606 && path.file_stem() == Some(OsStr::new(".cleanup"))
607 && path.extension().is_some()
608 // assume it's cleanup lock
609 && !is_fs_lock_expired(
610 Some(&entry),
611 &path,
612 cache_config.cleanup_interval(),
613 cache_config.allowed_clock_drift_for_files_from_future(),
614 )
615 {
616 continue; // skip active lock
617 }
618 add_unrecognized!(file: path);
619 }
620 (2, false) => {
621 match path.extension().and_then(OsStr::to_str) {
622 // mod or stats file
623 None | Some("stats") => {
624 cache_files.insert(path, entry);
625 }
626
627 Some(ext) => {
628 // check if valid lock
629 let recognized = ext.starts_with("wip-")
630 && !is_fs_lock_expired(
631 Some(&entry),
632 &path,
633 cache_config.optimizing_compression_task_timeout(),
634 cache_config.allowed_clock_drift_for_files_from_future(),
635 );
636
637 if !recognized {
638 add_unrecognized!(file: path);
639 }
640 }
641 }
642 }
643 (_, is_dir) => add_unrecognized!(is_dir, path),
644 }
645 }
646
647 // associate module with its stats & handle them
648 // assumption: just mods and stats
649 for (path, entry) in cache_files.iter() {
650 let path_buf: PathBuf;
651 let (mod_, stats_, is_mod) = match path.extension() {
652 Some(_) => {
653 path_buf = path.with_extension("");
654 (
655 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
656 Some((path, entry)),
657 false,
658 )
659 }
660 None => {
661 path_buf = path.with_extension("stats");
662 (
663 Some((path, entry)),
664 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
665 true,
666 )
667 }
668 };
669
670 // construct a cache entry
671 match (mod_, stats_, is_mod) {
672 (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
673 let mod_metadata = unwrap_or!(
674 mod_entry.metadata(),
675 add_unrecognized_and!([file: stats_path, file: mod_path], continue),
676 "Failed to get metadata, deleting BOTH module cache and stats files",
677 mod_path
678 );
679 let stats_mtime = unwrap_or!(
680 stats_entry.metadata().and_then(|m| m.modified()),
681 add_unrecognized_and!(
682 [file: stats_path],
683 unwrap_or!(
684 mod_metadata.modified(),
685 add_unrecognized_and!(
686 [file: stats_path, file: mod_path],
687 continue
688 ),
689 "Failed to get mtime, deleting BOTH module cache and stats \
690 files",
691 mod_path
692 )
693 ),
694 "Failed to get metadata/mtime, deleting the file",
695 stats_path
696 );
697 // .into() called for the SystemTimeStub if cfg(test)
698 vec.push(CacheEntry::Recognized {
699 path: mod_path.to_path_buf(),
700 mtime: stats_mtime.into(),
701 size: mod_metadata.len(),
702 })
703 }
704 (Some(_), Some(_), false) => (), // was or will be handled by previous branch
705 (Some((mod_path, mod_entry)), None, _) => {
706 let (mod_metadata, mod_mtime) = unwrap_or!(
707 mod_entry
708 .metadata()
709 .and_then(|md| md.modified().map(|mt| (md, mt))),
710 add_unrecognized_and!([file: mod_path], continue),
711 "Failed to get metadata/mtime, deleting the file",
712 mod_path
713 );
714 // .into() called for the SystemTimeStub if cfg(test)
715 vec.push(CacheEntry::Recognized {
716 path: mod_path.to_path_buf(),
717 mtime: mod_mtime.into(),
718 size: mod_metadata.len(),
719 })
720 }
721 (None, Some((stats_path, _stats_entry)), _) => {
722 debug!("Found orphaned stats file: {}", stats_path.display());
723 add_unrecognized!(file: stats_path);
724 }
725 _ => unreachable!(),
726 }
727 }
728 }
729
730 let mut vec = Vec::new();
731 enter_dir(&mut vec, self.directory(), 0, &self.cache_config);
732 vec
733 }
734 }
735
read_stats_file(path: &Path) -> Option<ModuleCacheStatistics>736 fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
737 fs::read_to_string(path)
738 .map_err(|err| {
739 trace!(
740 "Failed to read stats file, path: {}, err: {}",
741 path.display(),
742 err
743 )
744 })
745 .and_then(|contents| {
746 toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
747 trace!(
748 "Failed to parse stats file, path: {}, err: {}",
749 path.display(),
750 err,
751 )
752 })
753 })
754 .ok()
755 }
756
write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool757 fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
758 toml::to_string_pretty(&stats)
759 .map_err(|err| {
760 warn!(
761 "Failed to serialize stats file, path: {}, err: {}",
762 path.display(),
763 err
764 )
765 })
766 .and_then(|serialized| {
767 fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
768 })
769 .is_ok()
770 }
771
772 /// Tries to acquire a lock for specific task.
773 ///
774 /// Returns Some(path) to the lock if succeeds. The task path must not
775 /// contain any extension and have file stem.
776 ///
777 /// To release a lock you need either manually rename or remove it,
778 /// or wait until it expires and cleanup task removes it.
779 ///
780 /// Note: this function is racy. Main idea is: be fault tolerant and
781 /// never block some task. The price is that we rarely do some task
782 /// more than once.
acquire_task_fs_lock( task_path: &Path, timeout: Duration, allowed_future_drift: Duration, ) -> Option<PathBuf>783 fn acquire_task_fs_lock(
784 task_path: &Path,
785 timeout: Duration,
786 allowed_future_drift: Duration,
787 ) -> Option<PathBuf> {
788 assert!(task_path.extension().is_none());
789 assert!(task_path.file_stem().is_some());
790
791 // list directory
792 let dir_path = task_path.parent()?;
793 let it = fs::read_dir(dir_path)
794 .map_err(|err| {
795 warn!(
796 "Failed to list cache directory, path: {}, err: {}",
797 dir_path.display(),
798 err
799 )
800 })
801 .ok()?;
802
803 // look for existing locks
804 for entry in it {
805 let entry = entry
806 .map_err(|err| {
807 warn!(
808 "Failed to list cache directory, path: {}, err: {}",
809 dir_path.display(),
810 err
811 )
812 })
813 .ok()?;
814
815 let path = entry.path();
816 if path.is_dir() || path.file_stem() != task_path.file_stem() {
817 continue;
818 }
819
820 // check extension and mtime
821 match path.extension() {
822 None => continue,
823 Some(ext) => {
824 if let Some(ext_str) = ext.to_str() {
825 // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
826 if ext_str.starts_with("wip-")
827 && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
828 {
829 return None;
830 }
831 }
832 }
833 }
834 }
835
836 // create the lock
837 let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
838 let _file = fs::OpenOptions::new()
839 .create_new(true)
840 .write(true)
841 .open(&lock_path)
842 .map_err(|err| {
843 warn!(
844 "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
845 lock_path.display(),
846 err
847 )
848 })
849 .ok()?;
850
851 Some(lock_path)
852 }
853
854 // we have either both, or just path; dir entry is desirable since on some platforms we can get
855 // metadata without extra syscalls
856 // furthermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
is_fs_lock_expired( entry: Option<&fs::DirEntry>, path: &PathBuf, threshold: Duration, allowed_future_drift: Duration, ) -> bool857 fn is_fs_lock_expired(
858 entry: Option<&fs::DirEntry>,
859 path: &PathBuf,
860 threshold: Duration,
861 allowed_future_drift: Duration,
862 ) -> bool {
863 let mtime = match entry
864 .map_or_else(|| path.metadata(), |e| e.metadata())
865 .and_then(|metadata| metadata.modified())
866 {
867 Ok(mt) => mt,
868 Err(err) => {
869 warn!(
870 "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
871 path.display(),
872 err
873 );
874 return true; // can't read mtime, treat as expired, so this task will not be starved
875 }
876 };
877
878 // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
879 match SystemTime::now().duration_since(mtime) {
880 Ok(elapsed) => elapsed >= threshold,
881 Err(err) => {
882 trace!(
883 "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
884 path.display(),
885 err
886 );
887 // the lock is expired if the time is too far in the future
888 // it is fine to have network share and not synchronized clocks,
889 // but it's not good when user changes time in their system clock
890 err.duration() > allowed_future_drift
891 }
892 }
893 }
894
895 #[cfg(test)]
896 mod tests;
897