Source code for dismantle.package._handlers

"""Provides the PackageHandler interface and two example handlers.

Each package can be located in one of multiple locations. Packages can
be located on a remote server, on the local filesystem, or any  other
custom location.  In order to add a new custom location, the
PackageHandler needs to be extended to handle the new custom location.

Currently, two handlers are provided, one to handle packages stored on
the local filesystem, and one to support packages located online and
retrieved using a web request.
"""
import abc
import atexit
import json
import shutil
import tempfile
from hashlib import md5
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import List, Optional, Union
from urllib.parse import urlparse

import requests

from dismantle.package._formats import (
    DirectoryPackageFormat,
    PackageFormat,
    ZipPackageFormat
)

Formats = Optional[List[PackageFormat]]


[docs]class PackageHandler(metaclass=abc.ABCMeta): """Base PackageHandler interface. The PackageHandler interface defines the structure expected for all package handlers and should be extended to provide additional custom package handler. """ @abc.abstractmethod def __getattr__(self, name) -> object: """Allow the __getattr__ method to be extended.""" ... @property @abc.abstractmethod def name(self) -> str: """Return a string containing the name of the package.""" ... @property @abc.abstractmethod def installed(self) -> bool: """Return a boolean if the package has been installed.""" ...
[docs] @staticmethod @abc.abstractmethod def grasps(path: Union[str, Path]) -> bool: """Check if package handler understand a package format.""" ...
[docs] @abc.abstractmethod def install( self, path: Union[str, Path], version: Optional[str] = None ) -> bool: """Install a specific package version.""" ...
[docs] @abc.abstractmethod def uninstall(self) -> bool: """Uninstall a package.""" ...
[docs] @abc.abstractmethod def verify(self, signature: str) -> bool: """Verify a packages hash with the provided signature.""" ...
[docs]class LocalPackageHandler(PackageHandler): """Directory package structure.""" def __init__( self, name: str, src: Union[str, Path], formats: Optional[Formats] = None ) -> None: """Initialise the package.""" self._meta = {} self._meta['name'] = name self._path = None self._installed = False self._src = str(src)[7:] if str(src)[:7] == 'file://' else src if formats is None: formats = [DirectoryPackageFormat] for current_format in formats: if current_format.grasps(self._src): self._format = current_format break else: message = 'unable to process source format' raise FileNotFoundError(message) @property def name(self) -> str: """Return the name of the package from the meta data.""" return self.meta['name'] @property def path(self) -> str: """Return the path the package is installed into.""" return str(self._path) def __getattr__(self, name): """Return metadata. Return an attribute from the meta data if the data doesnt exist. """ if name not in self._meta: message = f'{name} is an invalid attribute' raise AttributeError(message) return self._meta[name] @property def installed(self) -> bool: """Return the current installation state.""" return self._installed
[docs] @staticmethod def grasps(path: Union[str, Path]) -> bool: """Check if the package format can process. Check if a directory on the local filesystem has been provided. """ path = str(path)[7:] if str(path)[:7] == 'file://' else path try: return Path(str(path)).exists() except OSError: return False
[docs] def install( self, path: Optional[str] = None, version: Optional[str] = None ) -> bool: """Install a package. The local package handler does not install the package. No version control exists for the directory package type. """ path = str(path)[7:] if str(path)[:7] == 'file://' else path self._path = path if path else self._src self._format.extract(self._src, self._path) self._meta = {**self._meta, **self._load_metadata(self._path)} self._installed = True return True
[docs] def uninstall(self) -> bool: """Uninstall the package.""" if self._path != self._src: self._remove_files(Path(self._path or '')) self._path = None self._installed = False return True
[docs] def verify(self, digest: Optional[str] = None) -> bool: """Verify the package hasn't been tampered with.""" if digest is None: return True message = 'the local package handler does not support verification' raise ValueError(message)
def _load_metadata(self, path: Union[str, Path]): """Load the package.json file into memory.""" path = Path(str(path)[7:] if str(path)[:7] == 'file://' else path) try: with open(path / 'package.json') as package: meta = json.load(package) if 'name' not in meta: message = 'meta file missing name value' raise ValueError(message) if self._meta['name'] != meta['name']: message = 'meta name does not match provided package name' raise ValueError(message) if 'version' not in meta: message = 'meta file missing version value' raise ValueError(message) return meta except JSONDecodeError: message = 'invalud package file format' raise ValueError(message) @staticmethod def _remove_files(path: Union[str, Path]) -> None: """Recursively remove the path and all its sub items.""" path = str(path)[7:] if str(path)[:7] == 'file://' else path try: shutil.rmtree(path) except OSError: FileNotFoundError('unable to remove files')
[docs]class HttpPackageHandler(PackageHandler): """Url package structure.""" def __init__( self, name: str, src: Union[str, Path], formats: Formats = None, cache_dir: Optional[Union[str, Path]] = None ): """Initialise the package.""" self._meta = {} self._meta['name'] = name self._path = None self._installed = False self._updated = False self._src = src cache_dir = Path(cache_dir or '') if not cache_dir: tmp_cache = tempfile.TemporaryDirectory() cache_dir = Path(tmp_cache.name) atexit.register(tmp_cache.cleanup) else: cache_dir.mkdir(0x777, True, True) parts = urlparse(str(src)) ext = ''.join(Path(parts.path).suffixes) self._cache = Path(cache_dir / Path(name + ext)) if not HttpPackageHandler.grasps(src): message = 'invalid handler format' raise ValueError(message) if formats is None: formats = [ZipPackageFormat] for current_format in formats: if current_format.grasps(self._src): self._format = current_format break else: message = 'a valid source is required' raise FileNotFoundError(message) @property def name(self) -> str: """Return the name of the package from the meta data.""" return self.meta['name'] def __getattr__(self, name): """Return attrib from meta data if the data doesnt exist.""" if name not in self._meta: message = f'{name} is an invalid attribute' raise AttributeError(message) return self._meta[name] @property def installed(self) -> bool: """Return the current installation state.""" return self._installed
[docs] @staticmethod def grasps(path: Union[str, Path]) -> bool: """Check if dir on the local filesystem has been provided.""" parts = urlparse(str(path)) if parts.scheme not in ['http', 'https']: return False return True
def _fetch_and_extract(self): headers = {'If-None-Match': self._digest} req = requests.get( str(self._src), headers=headers, allow_redirects=True ) if req.status_code not in [200, 304]: raise FileNotFoundError(req.status_code) elif req.status_code == 200: self._cache.parents[0].mkdir(parents=True, exist_ok=True) with open(self._cache, 'wb') as cached_package: cached_package.write(req.content) self._updated = True self._format.extract(self._cache, self._path or '')
[docs] def install(self, path: str, version: Optional[str] = None) -> bool: """Install the current package to the given path. If there's already a package in path we'll only fetch if the version is different. """ fetch_required = True try: existing_pkg_metadata = self._load_metadata(Path(path)) if existing_pkg_metadata['version'] == self._meta['version']: fetch_required = False except ValueError: # Ignore _load_metadata errors pass except OSError: # Ignore Not Found pass except KeyError: # ignore if `_meta` is empty pass self._path = path self._updated = False if fetch_required: self._fetch_and_extract() self._meta = {**self._meta, **self._load_metadata(Path(self._path))} self._installed = True return True
[docs] def uninstall(self) -> bool: """Uninstall the package.""" if self._path != self._src: self._remove_files(Path(self._path or '')) self._path = None self._installed = False return True
[docs] def verify(self, digest: Optional[str] = None) -> bool: """Verify the package hasn't been tampered with.""" if digest is None: return True message = 'the http package handler does not support verification' raise ValueError(message)
def _load_metadata(self, path: Path): """Load the package.json file into memory.""" try: with open(path / 'package.json') as package: meta = json.load(package) if 'name' not in meta: message = 'meta file missing name value' raise ValueError(message) if self._meta['name'] != meta['name']: message = 'meta name does not match provided package name' raise ValueError(message) if 'version' not in meta: message = 'meta file missing version value' raise ValueError(message) return meta except JSONDecodeError: message = 'invalid package file format' raise ValueError(message) @staticmethod def _remove_files(path: Path) -> None: """Recursively remove the path and all its sub items.""" try: shutil.rmtree(path) except OSError: FileNotFoundError('unable to remove files') @property def outdated(self) -> bool: """Execute a head request using the requests library. To check that the ETag matches. """ headers = {'If-None-Match': self._digest} req = requests.head( str(self._src or ''), headers=headers, allow_redirects=True ) if req.status_code not in [200, 304]: raise FileNotFoundError(req.status_code) elif req.status_code == 200: return True else: return False @property def _digest(self) -> str: """Return the md5 digest of the currently cached index file.""" digest = md5() if not self._cache.exists(): return digest.hexdigest() with open(self._cache, 'rb') as cached_package: for block in iter(lambda: cached_package.read(65536), b''): digest.update(block) return digest.hexdigest()