Source code for punchpipe.control.cache_nanny

import os
import sys
import time

from prefect import flow, get_run_logger
from prefect.variables import Variable

from punchpipe.control.cache_layer import manager
from punchpipe.control.util import load_pipeline_configuration


[docs] @flow def cache_nanny(pipeline_config_path: str): logger = get_run_logger() pipeline_config = load_pipeline_configuration(pipeline_config_path) new_state = pipeline_config['cache_layer']['cache_enabled'] and (sys.version_info.minor >= 13) old_state = Variable.get("use_shm_cache", "unset") logger.info(f"'cache_enabled' is {old_state}") if new_state != old_state: Variable.set("use_shm_cache", new_state, overwrite=True) logger.info(f"Changed 'cache_enabled' from {old_state} to {new_state}") cache_files = manager.get_existing_cache_files() cache_files = [ [ (stat := os.stat(file)).st_atime, stat.st_size, file, ] for file in cache_files if os.path.isfile(file) ] cache_files.sort(key=lambda x: x[0]) n_removed = 0 size_removed = 0 cutoff_time = time.time() - pipeline_config['cache_layer']['max_age_hours'] * 3600 while len(cache_files) and cache_files[0][0] < cutoff_time: _, size, path = cache_files.pop(0) os.remove(path) n_removed += 1 size_removed += size logger.info(f"Removed {n_removed} cache entries ({size_removed/1e6:.1f} MB) for age") total_size = sum(f[1] for f in cache_files) excess = total_size - pipeline_config['cache_layer']['max_size_MB'] * 1e6 n_removed = 0 size_removed = 0 while excess > 0: _, size, path = cache_files.pop(0) os.remove(path) excess -= size n_removed += 1 size_removed += size logger.info(f"Removed {n_removed} cache entries ({size_removed/1e6:.1f} MB) for size")