Source code for file_caching.dir_cache

# Copyright (c) Fraunhofer MEVIS, Germany. All rights reserved.
# **InsertLicense** code author="Jan-Martin Kuhnigk"

import logging
import os
import re
import shutil
import tempfile
import warnings
from pathlib import PurePath
from typing import Callable, List, Optional

import dirsync

from fmeTestSupport.Paths import isSubPath, isUNCRootPath, normalizedPath

# Environment variable key to override the cache root path
CACHE_ROOT_PATH_ENV_KEY = "DirCache_CacheRootPath"


[docs] class DirCache(object): """ Caches all access to a user-defined server root path or its sub-folders. """ # Used as root folder name for the cache directory, if no cache_root_path is given. DEFAULT_CACHE_ROOT_ID = "__DIRCACHE__" # The cache has no root access, so cache contents may be outdated.
[docs] class NoSourceAccessWarning(UserWarning): pass
[docs] class RootPathError(ValueError): pass
# Server root paths must be absolute
[docs] class RootPathMustBeAbsoluteError(RootPathError): def __init__(self, server_root_path): super().__init__( "Invalid root directory '{}', must be an absolute path".format( server_root_path ) )
# Server root paths must be absolute
[docs] class RootPathMustNotBeUNCRootError(RootPathError): def __init__(self, server_root_path): super().__init__( "Invalid root directory '{}', valid UNC paths (e.g. Windows network paths) must have " "at least two components, e.g. '\\\\srv\\dir'".format(server_root_path) )
ExpandPathFct = Callable[[str], str] def __init__( self, server_root_path: str, target_subdir: Optional[str] = None, cache_root_path: Optional[str] = None, expand_path_fct: ExpandPathFct = os.path.expandvars, enable_no_root_access_warnings: bool = True, ) -> None: """ Sets up a new cache for the *server_root_path*. Uses the logger "cached_path.dir_cache" for logging. :param server_root_path: Root of all paths applicable for caching. Must be an absolute path. :param target_subdir: Name of the target cache dir in cache_root_path. If omitted, auto-generated from `server_root_path` :param cache_root_path: Base path used for the cache, by default '<TEMP-Dir>/<DEFAULT_CACHE_ROOT_ID>'. :param expand_path_fct: If omitted os.path.expandvars is used :param enable_no_root_access_warnings: When, on get(), there is no root access but a cached version of the requested\ resource is available, this parameter determines if a warning is issued. """ # Check for environment variable override for cache root path cache_root_path = ( cache_root_path or os.environ.get(CACHE_ROOT_PATH_ENV_KEY) or os.path.join(tempfile.gettempdir(), self.DEFAULT_CACHE_ROOT_ID) ) self.__expand_path_fct: DirCache.ExpandPathFct = expand_path_fct self.__server_root_path: str = self.__expand_path_fct(server_root_path) self.__assert_server_root_path_valid() self.__cache_base_path: str = self.__compose_cache_base_path( cache_root_path, target_subdir ) self.__logger: logging.Logger = self.logger() self.enable_missing_root_warnings = enable_no_root_access_warnings def __assert_server_root_path_valid(self): if isUNCRootPath(self.__server_root_path): raise self.RootPathMustNotBeUNCRootError(self.__server_root_path) if not PurePath(self.__server_root_path).is_absolute(): raise self.RootPathMustBeAbsoluteError(self.__server_root_path) @staticmethod def logger(): return logging.getLogger(__name__)
[docs] def get(self, source_path: str = ".", prune_target_folder=True) -> str: """ Used to access content (file or directory) at the cache's source path, cached or not. If *source_path* is a sub-path of the caches' *server_root_path*, it will first check if it has access to *server_root_path*. If so, it will sync the cache for `source_path`, and return the cached path, if `source_path` exists, and raise a *FileNotFoundError* otherwise. If there is no source access, it will check if there is a cached version of `source_path` and return its path, issuing a `DirCache.NoSourceAccessWarning` (unless `self.enable_no_root_access_warnings` was set to False). If `source_path` is not a sub-path of the caches' *server_root_path*, it will just check if the path exists, (raising a `FileNotFoundError` if not) and return `source_path` again. :param source_path: Path to the content, can be absolute or relative, where relative is interpreted relative to \ the caches server root path. :param prune_target_folder: If the path is a directory AND source access exists AND the result \ path contains unexpected extra contents, removing the extra contents on cache update can be disabled \ by providing False. :return: Path to the (cached or original) version of `source_path`. :raise FileNotFoundError: If `source_path` does not exist and there is no cached version. """ result_path = source_path if self.is_valid_source_path(source_path): # as we previously checked that source_path is valid, we can assume this will not return None result_path: str = str(self.get_cached_path(source_path)) if self.has_source_access(): self.__synchronize(source_path, result_path, prune_target_folder) if not os.path.exists(source_path): self.__handle_non_accessible_source_path(source_path, result_path) return result_path
[docs] def clear(self) -> None: """ Clears the entire cache. """ self.remove(self.__cache_base_path)
[docs] def remove(self, result_path: str) -> None: """ Ensures the given `result_path` is no longer in the cache. If the path is not a cache path, nothing will happen. :param result_path: Path previously returned by a `get_path` call, can be cached or not. """ if self.is_cache_path(result_path): self.__logger.info("Removing cached path %s...\n", result_path) if os.path.isdir(result_path): shutil.rmtree(result_path) elif os.path.isfile(result_path): os.remove(result_path) self.__logger.info("Remove DONE.\n")
[docs] def get_server_root_path(self): """ :return: The root path managed by this cache. """ return self.__server_root_path
def __compose_cache_base_path(self, cache_root_path, target_subdir): cache_base_path = self.__expand_path_fct(cache_root_path) if not target_subdir: target_subdir = self.__server_root_path return ( cache_base_path + "/" + DirCache.replace_path_seps(self.__expand_path_fct(target_subdir)) ) @staticmethod def replace_path_seps(expanded_path: str) -> str: return expanded_path.replace(":", "").replace("/", "__").replace("\\", "__")
[docs] def get_cached_path(self, source_path: str) -> str | None: """ Returns the cached path associated with source_path without doing any caching. Returns None if the given source path is not valid for this cache. """ target_path: Optional[str] = None if self.is_valid_source_path(source_path): target_path = self.__get_cached_path(source_path) return target_path
def __get_cached_path(self, valid_source_path: str) -> str: rel_source_path = PurePath.relative_to( normalizedPath(valid_source_path), normalizedPath(self.__server_root_path) ).as_posix() target_path = self.__cache_base_path if rel_source_path != ".": target_path += "/" + rel_source_path return target_path
[docs] def is_valid_source_path(self, source_path: str) -> bool: """ :param source_path: A directory or file path :return: True if the given *source_path* is semantically a sub-path of this caches' server root path. """ return self.__is_sub_path_when_ignoring_network_path(source_path)
def __is_sub_path_when_ignoring_network_path(self, source_path: str) -> bool: """ :param source_path: A directory or file path :return: True if the given *source_path* is a sub-path of this caches' server root path AFTER normalizing away Windows network location indicators ('//' or '\\'). """ non_network_source_path = source_path.replace("//", "/").replace("\\\\", "\\") non_network_server_root_path = self.__server_root_path.replace( "//", "/" ).replace("\\\\", "\\") return isSubPath(non_network_source_path, non_network_server_root_path) def __handle_non_accessible_source_path( self, source_path: str, result_path: str ) -> None: if not self.is_valid_source_path(source_path): raise FileNotFoundError( "Non-cacheable path '{}' not found!".format(source_path) ) self.__handle_valid_but_non_accessible_source(result_path, source_path) def __handle_valid_but_non_accessible_source( self, result_path: str, source_path: str ) -> None: if self.has_source_access(): raise FileNotFoundError("Source path '{}' not found!".format(source_path)) if not os.path.exists(result_path): raise FileNotFoundError( "No access to source at {} and path '{}' not found in cache!".format( self.__server_root_path, source_path ) ) if self.enable_missing_root_warnings: warningText = "No access to source at {}, using (possibly outdated) cached file '{}'.".format( self.__server_root_path, result_path ) warnings.warn(self.NoSourceAccessWarning(warningText), stacklevel=7) def __synchronize( self, source_path: str, target_path: str, prune_target_folder: bool ) -> None: self.__logger.info( "Synchronizing cache at %s from source at %s...\n", target_path, source_path ) if not os.path.exists(source_path): self.remove(target_path) self.__logger.info("Target %s was removed.\n", target_path) else: include_filters: List[str] = [] enable_purge = prune_target_folder if os.path.isfile(source_path): include_filters = self.__get_filters_for_exact_match(source_path) # Do not purge if dealing with a single file, that would not only delete sibling files not available at the # source but apparently also all files that don't match the include_filters. enable_purge = False source_path = os.path.dirname(source_path) target_path = os.path.dirname(target_path) self.__sync_dir(source_path, target_path, include_filters, enable_purge) self.__logger.info("Sync DONE.\n") @staticmethod def __get_filters_for_exact_match(source_path: str) -> List[str]: return ["^{}$".format(re.escape(os.path.basename(source_path)))] def __sync_dir( self, source_dir: str, target_dir: str, include_filters: List[str], enable_purge: bool, ) -> None: """ Synchronizes the given target_dir from the given source_dir. :param source_dir: Source directory :param target_dir: Target directory :param include_filters: List of regex patterns to include for source directory contents :param enable_purge: If True, all files in the target_dir that are not in the source_dir OR do not match the \ include_filters will be removed. """ self.__logger.debug( "Running dirsync %s\n", str([source_dir, target_dir, "sync", True, include_filters]), ) dirsync.sync( source_dir, target_dir, "sync", create=True, purge=enable_purge, only=include_filters, logger=self.__logger, ) self.__logger.debug("Done running dirsync.\n")
[docs] def is_cache_path(self, result_path: str) -> bool: """ Allows the user to find out if a path (e.g. one returned by *get*) is in the cache or not. :param result_path: Some path, typically a result of get :return: True if *result_path* is located in the cache. """ return isSubPath(result_path, self.__cache_base_path)
[docs] def has_source_access(self) -> bool: """ :return: True if there is read access to the caches' server root path. """ return os.access(self.__server_root_path, os.R_OK)
[docs] def get_cache_root_path(self) -> str: """ :return: The root of the cached version of the caches' server root path. """ return self.__cache_base_path