Source code for warg.packages.reloading

__author__ = "Christian Heider Lindbjerg"
__doc__ = r"""

           Created on 28/06/2020
           """

__all__ = [
    "is_module_available",
    "import_warning",
    "reimported_warning",
    "ensure_in_sys_path",
    "clean_sys_path",
    "remove_from_sys_path",
    "import_file",
    "find_ancestral_relatives",
    "find_nearest_ancestral_relative",
    "walk_up",
    "reload_all_modules",
    "reload_module",
]

from importlib import reload
from pathlib import Path

import importlib
import logging
import sys
from importlib.util import find_spec
from typing import Any, Callable, Iterable, List, Optional, Union
from warnings import warn

from warg.decorators import passes_kws_to

_logger = logging.getLogger(__name__)

IGNORE = """
PRELOADED_MODULES = set()
def init():
  # local imports to keep things neat
  from sys import modules

  global PRELOADED_MODULES

  # sys and importlib are ignored here too
  PRELOADED_MODULES = set(modules.values())


def reload_all():
  from sys import modules
  import importlib

  for module in set(modules.values()) - PRELOADED_MODULES:
    try:
      importlib.reload(module)
    except:
      # there are some problems that are swept under the rug here
      pass
init()
"""
VERBOSE = False


def contain(q: Any, s: Iterable) -> bool:
    """

    :param q:
    :param s:
    :return:
    """
    return q in s


[docs] def reload_module(module_name: str, containment_test: Callable = contain) -> None: """ :param module_name: :param containment_test: :return: """ if module_name in sys.modules: reload_set = {x for x in sys.modules if containment_test(module_name, x)} for a in reload_set: del sys.modules[a] # importlib.reload(sys.modules[mod_str]) #DOES NOT WORK ON FROM IMPORTS... sys.modules[a] = importlib.import_module(a) else: sys.modules[module_name] = importlib.import_module(module_name)
def reload_requirements(requirements_path: Path, containment_test: Callable = contain) -> None: """ :param requirements_path: :param containment_test: :return: """ from warg.packages.pip_parsing import get_requirements_from_file for r in get_requirements_from_file(requirements_path): reload_module(r.name, containment_test=containment_test)
[docs] def reload_all_modules(catch_exceptions: bool = True, verbose: bool = VERBOSE) -> None: """ :param catch_exceptions: :param verbose: :return: """ try: for mod in sys.modules.values(): reload(mod) except Exception as e: if verbose: _logger.error(mod) if catch_exceptions: if verbose: _logger.error(e) else: raise e
[docs] def import_file(path: Path, from_list=None) -> Any: """ Import a module given its filename, works both on absolute and relative paths :param path: :param from_list: :return: """ if from_list is None: from_list = {} globals_ = {} # globals() # determines package context locals_ = {} # locals() # Should not be used in import anyway sys_path = sys.path # Save original sys.path try: sys.path.insert(0, str(path.parent.absolute())) # Temporarily add parent dir of path to parent return __import__( path.stem, globals=globals_, locals=locals_, fromlist=from_list, level=0 ) # Get the module name (no extension) finally: sys.path = sys_path # Restore original sys.path
[docs] def walk_up(path: Path, top: Path, max_ascent: int = None): """ :param path: :param top: :param max_ascent: :return: """ i = 0 while True: yield path i += 1 if max_ascent and max_ascent < i: break if path == top: break else: path = path.parent
def walk_down(path: Path, max_descent: int = None): """ :param path: :param max_descent: :return: """ if max_descent == 0: return queue = [] for c in path.iterdir(): if c.is_dir(): yield c queue.append(c) for q in queue: try: yield from walk_down(q, max_descent=max_descent - 1 if max_descent else None) except: yield
[docs] def find_ancestral_relatives( target: Union[str, Path], context: Path = Path.cwd(), *, from_parent_of_context: bool = True, ancestral_levels: int = 2, descendant_levels: int = 2, top_level: Path = None, return_parent_of_target: bool = True, no_duplicates: bool = True, terminate_first: bool = False, ) -> List[Path]: """ :param target: :param context: :param from_parent_of_context: :param ancestral_levels: :param descendant_levels: :param top_level: :param return_parent_of_target: :param no_duplicates: :param terminate_first: :return: """ relatives = [] if top_level is None: top_level = context.root if from_parent_of_context: context = context.parent for p in walk_up(context, top_level, max_ascent=ancestral_levels): for wd in walk_down(p, max_descent=descendant_levels): if target in wd.parts[-ancestral_levels:]: if return_parent_of_target: wd = wd.parent relatives.append(wd) if terminate_first: break p /= target if p.exists(): if return_parent_of_target: p = p.parent relatives.append(p) if terminate_first: break if no_duplicates: return list(set(relatives)) return relatives
[docs] @passes_kws_to(find_ancestral_relatives) def find_nearest_ancestral_relative(*args, **kwargs) -> Optional[Path]: """ :param args: :param kwargs: :return: """ kwargs.update(terminate_first=True) result = find_ancestral_relatives(*args, **kwargs) if result: return result[0]
[docs] def clean_sys_path() -> None: """ Clean the sys.path for dead paths or duplicates :return: """ out = [] for path in sys.path: p = Path(path).resolve() if p.exists(): if p not in out: out.append(p) sys.path[:] = [str(o.absolute()) for o in out]
[docs] def remove_from_sys_path(target: Path, missing_ok: bool = True): """ Clean the sys.path for target path :param target: :param missing_ok: :return: """ out = [] target = target.resolve() for path in sys.path: p = Path(path).resolve() if p.exists() and target != p: if p not in out: out.append(p) sys.path[:] = [str(o.absolute()) for o in out]
[docs] def ensure_in_sys_path( path: Optional[Union[str, Path]], position: Optional[int] = None, resolve: bool = False, absolute: bool = True, verbose: bool = VERBOSE, ) -> None: """ Ensures that a path is in sys.path, but avoids duplicates. Can also resolve and absolute paths for duplication. Does not clean the existing paths in sys.path :param verbose: Whether to print verbose info :type verbose: bool :param path: The path to be inserted :type path: Optional[Union[str, Path]] :param position: If not supplied, the path will be appended at the end of the existing sys.path :type position: Optional[int] :param resolve: Whether to resolve the absolute path :type resolve: bool :param absolute: Insert the absolute path :type absolute: bool :return: None :rtype: None """ if path is None: # may be the case if the supplied path is being solved programmatically warn("No path was supplied") return path = Path(path) if absolute: path = path.absolute() str_path = str(path) sys_path_snapshot = sys.path if resolve: sys_path_snapshot = [Path(p).resolve() for p in sys_path_snapshot] inclusion_test = path.resolve() in sys_path_snapshot else: inclusion_test = str_path in sys_path_snapshot if not inclusion_test: if position: sys.path.insert(position, str_path) else: sys.path.append(str_path) else: if verbose: _logger.warning(f"{path} is already in sys path")
[docs] def is_module_available(module: str) -> bool: """Returns True if module is available. :param module: Name of the module to be checked. :type module: str :return: True if installed. :rtype: bool """ return find_spec(module) is not None
[docs] def import_warning(module_name: str) -> None: """ Inform the user that a module has been imported, useful when repeated imports are heavy in the contexts of multiprocessing. Lets the user identify which file is reporting heavy loading and restructure code to avoid repeated importing :param module_name: :return: """ from sys import modules if module_name in modules: warn( f"You already {module_name} had imported, consider restructuring your code to avoid repeated imports" )
[docs] def reimported_warning(module_name: str) -> None: """ Just an idea :return: """ raise NotImplemented
# TODO: touch .lock file to system for module_name for a multiprocess warning if already exists, # delete it again once process is done? # context_wrapper maybe useful if __name__ == "__main__": def _main() -> None: """ :rtype: None """ mod = "matplotlib" import_warning(mod) from matplotlib import pyplot import_warning(mod) pyplot.figure() def aisjdi(): from copy import deepcopy s = deepcopy(sys.path) ensure_in_sys_path(Path(__file__).parent) s2 = sys.path _logger.info(s == s2, set(s2) - set(s), set(s) - set(s2), s2) def iajsd(): from copy import deepcopy s = deepcopy(sys.path) clean_sys_path() s2 = sys.path _logger.info(s == s2, set(s2) - set(s), set(s) - set(s2), s2) def asuhdsaud(): _logger.info(find_ancestral_relatives("queues", context=__file__)) # _main() # aisjdi() # iajsd() asuhdsaud()