diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 721b820..c088851 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.4 +current_version = 0.4.0 commit = False tag = False allow_dirty = False diff --git a/.github/workflows/tox.yaml b/.github/workflows/tox.yaml index 37bf447..d03cf04 100644 --- a/.github/workflows/tox.yaml +++ b/.github/workflows/tox.yaml @@ -22,16 +22,19 @@ jobs: services: remote-storage: image: bitnami/minio:latest - env: - MINIO_ACCESS_KEY: minio - MINIO_SECRET_KEY: minio123 ports: - - 9000:9000 + - "9000:9000" + - "9001:9001" + env: + MINIO_ROOT_USER: minio-root-user + MINIO_ROOT_PASSWORD: minio-root-password steps: # pandoc needed for docu, see https://nbsphinx.readthedocs.io/en/0.7.1/installation.html?highlight=pandoc#pandoc - name: Install Non-Python Packages run: apt-get update -yq && apt-get -yq install pandoc git git-lfs rsync + - name: Safe directory workaround + run: git config --global --add safe.directory "$GITHUB_WORKSPACE" - uses: actions/checkout@v2.3.1 with: fetch-depth: 0 @@ -43,6 +46,8 @@ jobs: - name: Merge develop into current branch if: github.ref != 'refs/heads/develop' run: | + git config --global user.email "robot@github.com" + git config --global user.name "github_robot_user" git fetch origin develop:develop --update-head-ok git merge develop - name: Setup Python 3.8 diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ed47f79..433e638 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -42,8 +42,8 @@ tox_recreate: - name: bitnami/minio:latest alias: remote-storage variables: - MINIO_ACCESS_KEY: minio - MINIO_SECRET_KEY: minio123 + MINIO_ACCESS_KEY: minio-root-user + MINIO_SECRET_KEY: minio-root-password script: - pip install tox - tox -r @@ -67,8 +67,8 @@ tox_use_cache: - name: bitnami/minio:latest alias: remote-storage variables: - MINIO_ACCESS_KEY: minio - MINIO_SECRET_KEY: minio123 + MINIO_ACCESS_KEY: minio-root-user + MINIO_SECRET_KEY: minio-root-password script: - pip install tox - tox diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5c20c8a..c446549 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,11 +1,11 @@ repos: - repo: https://github.com/psf/black - rev: 20.8b1 + rev: 23.1.0 hooks: - id: black language_version: python3 - repo: https://github.com/PyCQA/isort - rev: 5.6.4 + rev: 5.12.0 hooks: - id: isort - repo: https://github.com/kynan/nbstripout diff --git a/README_dev.md b/CONTRIBUTING.md similarity index 76% rename from README_dev.md rename to CONTRIBUTING.md index c574aab..57d1e4e 100644 --- a/README_dev.md +++ b/CONTRIBUTING.md @@ -1,12 +1,19 @@ accsr library ============= -This repository contains the accsr python library together with utilities for building, testing, -documentation and configuration management. The library can be installed locally by running +## Installing -```python setup.py install``` +You should install the library together with all dependencies as an editable package. We strongly suggest to use some form of virtual environment for working with the library. E.g. with conda: -from the root directory. +```shell +conda create -n accsr python=3.8 +conda activate accsr +pip install -e . +pip install -r requirements-dev.txt -r requirements-docs.txt -r requirements-test.txt -r requirements-linting.txt -r requirements-coverage.txt +``` + +from the root directory. Strictly speaking, you wouldn't +need to install the dev dependencies, as they are installed by `tox` on the file, but they are useful for development without using tox. This project uses the [black](https://github.com/psf/black) source code formatter and [pre-commit](https://pre-commit.com/) to invoke it as a Git pre-commit hook. @@ -28,14 +35,15 @@ and documentation. Before pushing your changes to the remote it is often useful to execute `tox` locally in order to detect mistakes early on. -We strongly suggest to use some form of virtual environment for working with the library. E.g. with conda: -```shell script -conda create -n accsr python=3.8 -conda activate accsr -pip install -r requirements.txt -r requirements-dev.txt -``` ### Testing and packaging + +Local testing is done with pytest and tox. Note that you can +perform each part of the test pipeline individually as well, either +by using `tox -e ` or by executing the scripts in the +`build_scripts` directory directly. See the `tox.ini` file for the +list of available environments and scripts. + The library is built with tox which will build and install the package and run the test suite. Running tox will also generate coverage and pylint reports in html and badges. You can configure pytest, coverage and pylint by adjusting [pytest.ini](pytest.ini), [.coveragerc](.coveragerc) and @@ -65,40 +73,11 @@ tests. Have a look at the example notebook for an explanation of how this works. You might wonder why the requirements.txt already contains numpy. The reason is that tox seems to have a problem with empty requirements files. Feel free to remove numpy once you have non-trivial requirements -## Configuration Management -The repository also includes configuration utilities that are often helpful when using data-related libraries. -They do not form part of the resulting package, you can (and probably should) adjust them to your needs. - -## CI/CD and Release Process -This repository contains a gitlab ci/cd pipeline that will run the test suite and -publish docu, badges and reports. Badges can accessed from the pipeline's artifacts, e.g. for the coverage badge -the url will be: -``` -https://gitlab.aai.lab/%{project_path}/-/jobs/artifacts/develop/raw/badges/coverage.svg?job=tox -``` - -### Development and Release Process - -In order to be able to automatically release new versions of the package from develop and master, the - CI pipeline should have access to the following variables (they should already be set on global level): - -``` -PYPI_REPO_URL -PYPI_REPO_USER -PYPI_REPO_PASS -``` - -They will be used in the release steps in the gitlab pipeline. - -You will also need to set up Gitlab CI deploy keys for -automatically committing from the develop pipeline during version bumping - #### Automatic release process In order to create an automatic release, a few prerequisites need to be satisfied: -- The project's virtualenv needs to be active - The repository needs to be on the `develop` branch - The repository must be clean (including no untracked files) diff --git a/README.md b/README.md index d2e1ee3..b76446a 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,16 @@ -# accsr +# accsr: Simple tools for loading data and configurations This lightweight library contains utilities for managing, loading, uploading, opening and generally wrangling data and configurations. It was battle tested in multiple projects at appliedAI. -Please open new issues for bugs, feature requests and extensions. See more details about the structure and -workflow in the [developer's readme](README_dev.md). +The main useful abstractions provided by this library are: +1. The `RemoteStorage` +class for a git-like, programmatic access to data stored in any cloud storage. +2. The configuration module for a simple, yet powerful configuration management. ## Overview -Source code documentation and usage examples are [here](TODO). We also provide notebooks with examples in *TODO*. +Source code documentation and usage examples are [here](https://appliedai-initiative.github.io/accsr/docs/). ## Installation @@ -22,3 +24,9 @@ To live on the edge, install the latest develop version with pip install --pre accsr ``` +## Contributing + +Please open new issues for bugs, feature requests and extensions. See more details about the structure and +workflow in the [developer's readme](README_dev.md). The coverage and pylint report can be found on the project's +[github pages](https://appliedai-initiative.github.io/accsr/). + diff --git a/docs/getting-started.rst b/docs/getting-started.rst deleted file mode 100644 index 41c37b9..0000000 --- a/docs/getting-started.rst +++ /dev/null @@ -1,7 +0,0 @@ -Getting started -=============== - -Welcome to the accsr library. -See the project's repository_ for more information. - -.. _repository: https://gitlab.aai.lab//git@gitlab.aai.lab:resources/data-access.git diff --git a/notebooks/Configuration with accrs.ipynb b/notebooks/Configuration with accrs.ipynb deleted file mode 100644 index f7756b6..0000000 --- a/notebooks/Configuration with accrs.ipynb +++ /dev/null @@ -1,120 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "collapsed": true, - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "# The configuration module\n", - "\n", - "accsr provides utilities for reading configuration from a hierarchy of files and customizing access to them. In\n", - "this notebook we show some use case examples for this." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "outputs": [], - "source": [ - "from accsr.config import ConfigProviderBase, DefaultDataConfiguration, ConfigurationBase\n", - "from accsr.remote_storage import RemoteStorage, RemoteStorageConfig" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%%\n" - } - } - }, - { - "cell_type": "markdown", - "source": [ - "## Setting up configuration\n", - "\n", - "The recommended way of using accsr's configuration utils is to create a module called `config.py` within your project\n", - "and setup classes and methods for managing and providing configuration. In the cell below we show a minimal example\n", - "of such a file.\n", - "\n", - "Under the hood the config provider is reading out the `__Configuration` class from generics at runtime and makes sure\n", - "that only one global instance of your custom `__Configuration` exists in memory. Don't worry if you are unfamiliar\n", - "with the coding patterns used here - you don't need to understand them to use the config utils.\n", - "You will probably never need to adjust the `ConfigProvider` related code." - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%% md\n" - } - } - }, - { - "cell_type": "code", - "execution_count": null, - "outputs": [], - "source": [ - "class __Configuration(ConfigurationBase):\n", - " pass\n", - "\n", - "\n", - "class ConfigProvider(ConfigProviderBase[__Configuration]):\n", - " pass\n", - "\n", - "\n", - "_config_provider = ConfigProvider()\n", - "\n", - "\n", - "def get_config(reload=False) -> __Configuration:\n", - " \"\"\"\n", - " :param reload: if True, the configuration will be reloaded from the json files\n", - " :return: the configuration instance\n", - " \"\"\"\n", - " return _config_provider.get_config(reload=reload)" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%%\n" - } - } - }, - { - "cell_type": "code", - "execution_count": null, - "outputs": [], - "source": [ - "# TODO: extend notebook\n" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%%\n" - } - } - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 2 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython2", - "version": "2.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} \ No newline at end of file diff --git a/notebooks/Quick Intro.ipynb b/notebooks/Quick Intro.ipynb new file mode 100644 index 0000000..7f9d246 --- /dev/null +++ b/notebooks/Quick Intro.ipynb @@ -0,0 +1,445 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Intro to accsr\n", + "\n", + "The goal of `accsr` is to simplify programmatic access to data on disc and in\n", + "remote storage in python. We often found ourselves repeating the same lines of\n", + "code for pulling something from a bucket, loading a file from a tar archive\n", + "or creating a configuration module for storing paths to existing\n", + "or to-be-loaded files. `accsr` allows doing all this directly from python,\n", + "without relying on a cli or external tools.\n", + "\n", + "\n", + "One of the design goals of accsr is to allow the users to use the same code\n", + "for loading data and configuration, independently of the state of the local\n", + "file system.\n", + "\n", + "For example, a developer with all data already loaded who wants to\n", + "perform an experiment with some extended data set, would load\n", + "the configuration with `get_config()`, instantiate a `RemoteStorage`\n", + "object and call `pull()` to download any missing data from the remote storage.\n", + "If no data is missing, nothing will be downloaded, thus creating no overhead.\n", + "\n", + "A user who does not have the data locally, would also call `get_config()`,\n", + "(possibly using a different `config_local.json` file, with different access keys or namespaces),\n", + "and then als call `pull()` with the same code. The data will be downloaded\n", + "from the remote storage and stored locally.\n", + "\n", + "Thus, the code will never need to change between development, testing and deployment,\n", + "and unnecessary overhead for loading data is reduced as much as possible.\n", + "\n", + "This approach also makes it easy to collaborate on data sets with the same code-base,\n", + "and avoid stepping on each other's toes by accident." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The configuration module\n", + "\n", + "The configuration module provides utilities for reading configuration\n", + "from a hierarchy of files and customizing access to them. Let us look at\n", + "some use case examples for this." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from accsr.config import ConfigProviderBase, DefaultDataConfiguration, ConfigurationBase\n", + "from accsr.remote_storage import RemoteStorage, RemoteStorageConfig\n", + "import os\n", + "from pathlib import Path\n", + "import requests\n", + "from time import sleep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Setting up configuration providers\n", + "\n", + "The recommended way of using `accsr`'s configuration utils is to create a module called `config.py` within your project\n", + "and setup classes and methods for managing and providing configuration. In the cell below we show a minimal example\n", + "of such a file.\n", + "\n", + "Under the hood the config provider is reading out the `__Configuration` class from generics at runtime and makes sure\n", + "that only one global instance of your custom `__Configuration` exists in memory. Don't worry if you are unfamiliar\n", + "with the coding patterns used here - you don't need to understand them to use the config utils.\n", + "You will probably never need to adjust the `ConfigProvider` related code." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class __Configuration(ConfigurationBase):\n", + " pass\n", + "\n", + "\n", + "class ConfigProvider(ConfigProviderBase[__Configuration]):\n", + " pass\n", + "\n", + "\n", + "_config_provider = ConfigProvider()\n", + "\n", + "\n", + "def get_config(reload=False) -> __Configuration:\n", + " \"\"\"\n", + " :param reload: if True, the configuration will be reloaded from the json files\n", + " :return: the configuration instance\n", + " \"\"\"\n", + " return _config_provider.get_config(reload=reload)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Loading configuration from files\n", + "\n", + "We found the following workflow useful for managing configuration files:\n", + "\n", + "1. Create a `config.json` file in the root of your project.\n", + "This file should contain all the default configuration and be committed to version control.\n", + "2. Create a `config_local.json` file in the root of your project with the user-specific configuration.\n", + "This file should not be committed to version control. It does not need to contain all the configuration,\n", + "only the parts that are different from the default configuration.\n", + "\n", + "A typical use case is to have default configuration for the `RemoteStorage` in `config.json` and\n", + "to have secrets (like the access key and secret), as well as a user-specific base path in `config_local.json`.\n", + "In this way, multiple users can use the same code for loading data while still being able to experiment\n", + "on their own data sets - for example storing these data sets in the same bucket but in different namespaces.\n", + "\n", + "Another use case is to include a read-only access key in `config.json`, which is then\n", + "distributed to users in version-control, and a read-write access key in `config_local.json` for\n", + "the developers who need to update data." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The RemoteStorage facilities\n", + "\n", + "`accsr` makes it easy to interact with data stored in a remote blob storage, like S3, Google Storage,\n", + "Azure Storage or similar. The `RemoteStorage` implements a git-like logic and uses `apache-libcloud`\n", + "underneath." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In order to demonstrate the RemoteStorage functionality, we will start [minIO](https://min.io/),\n", + "an object store with S3 interface, using docker compose.\n", + "We also switch to the tests directory where the docker-compose file and some resource files for\n", + "testing have been prepared." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "notebooks_dir = Path(os.getcwd()).absolute()\n", + "tests_dir = notebooks_dir.parent / \"tests\" / \"accsr\"\n", + "\n", + "os.chdir(tests_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'os' is not defined", + "output_type": "error", + "traceback": [ + "\u001B[1;31m---------------------------------------------------------------------------\u001B[0m", + "\u001B[1;31mNameError\u001B[0m Traceback (most recent call last)", + "Input \u001B[1;32mIn [1]\u001B[0m, in \u001B[0;36m\u001B[1;34m\u001B[0m\n\u001B[1;32m----> 1\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[43mos\u001B[49m\u001B[38;5;241m.\u001B[39mgetenv(\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mCI\u001B[39m\u001B[38;5;124m\"\u001B[39m):\n\u001B[0;32m 2\u001B[0m \u001B[38;5;66;03m# In CI, we start the minIO container separately\u001B[39;00m\n\u001B[0;32m 3\u001B[0m get_ipython()\u001B[38;5;241m.\u001B[39msystem(\u001B[38;5;124m'\u001B[39m\u001B[38;5;124mdocker-compose up -d\u001B[39m\u001B[38;5;124m'\u001B[39m)\n\u001B[0;32m 4\u001B[0m host \u001B[38;5;241m=\u001B[39m \u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mlocalhost\u001B[39m\u001B[38;5;124m\"\u001B[39m\n", + "\u001B[1;31mNameError\u001B[0m: name 'os' is not defined" + ] + } + ], + "source": [ + "if not os.getenv(\"CI\"):\n", + " # In CI, we start the minIO container separately\n", + " !docker-compose up -d\n", + " host = \"localhost\"\n", + "else:\n", + " host = \"remote-storage\"\n", + "\n", + "port = 9001\n", + "api_port = 9000" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now should have minio up and running." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can instantiate a RemoteStorage object and interact with minIO." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "remote_storage_config = RemoteStorageConfig(\n", + " provider=\"s3\",\n", + " key=\"minio-root-user\",\n", + " secret=\"minio-root-password\",\n", + " bucket=\"accsr-demo\",\n", + " base_path=\"my_remote_dir\",\n", + " host=host,\n", + " port=api_port,\n", + " secure=False,\n", + ")\n", + "\n", + "storage = RemoteStorage(remote_storage_config)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `base_path` is a \"directory\" (or rather a namespace) within the bucket.\n", + "All calls to the storage object will only affect files in the `base_path`.\n", + "\n", + "The bucket itself does not exist yet, so let us create it.\n", + "This has to be done by the user explicitly, to prevent accidental costs. Of course,\n", + "if the configuration is pointing to an existing bucket, this step is not necessary." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "storage.create_bucket()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can push, pull, list and generally interact with objects inside `base_path` within the bucket.\n", + "Let us first push the resources directory to have something to start with." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `pull` and `push` commands will return a summary of the transaction with the bucket.\n", + "If the flag `dryrun=True` is specified, then the transaction is only computed but not\n", + "executed - a good way to make sure that you are doing what is desired before actually\n", + "interacting with data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dry_run_summary = storage.push(\"resources\", dryrun=True)\n", + "\n", + "print(f\"Here the dryrun summary: \")\n", + "dry_run_summary.print_short_summary()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The summary shows that we would push multiple files with this call if we remove the dryrun flag.\n", + "Every detail of the transaction can be retrieved from the summary object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "local_files_checked = dry_run_summary.matched_source_files\n", + "would_be_pushed = dry_run_summary.not_on_target\n", + "pushed_files = dry_run_summary.synced_files\n", + "\n", + "print(\n", + " f\"Out of {len(local_files_checked)} files that we found inside the 'resources' dir, \"\n", + " f\"we would push {len(would_be_pushed)}. In the last transaction {len(pushed_files)} files were synced.\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now let us actually perform the push" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def push_and_print():\n", + " push_summary = storage.push(\"resources\")\n", + " local_files_checked = push_summary.matched_source_files\n", + " pushed_files = push_summary.synced_files\n", + "\n", + " print(\n", + " f\"Out of {len(local_files_checked)} files that we found inside the \"\n", + " f\"'resources' dir, we pushed {len(pushed_files)}.\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "push_and_print()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If we now push again, no new files will be synced. This holds even if `force=True` is specified, because the hashes are equal.\n", + "The flag `force=True` is useful if there are collisions in file names for files with different hashes.\n", + "In that case, a transaction will fail and nothing will be executed, much like with git. This is useful to\n", + "avoid uncertain state, where a transaction breaks in the middle of execution.\n", + "\n", + "In `accsr`, this behaviour is achieved by always inspecting the transaction summary before performing any\n", + "changes on filesystems and thus rejecting a transaction entirely if collisions happen with `force=False`\n", + "(the default).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "push_and_print()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If we delete one file on the remote and push again, a single file will be pushed." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deleted_files = storage.delete(\"resources/sample.txt\")\n", + "print(f\"Deleted {len(deleted_files)} files.\")\n", + "\n", + "push_and_print()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The same logic applies to pulling. Generally, `RemoteStorage` only downloads and uploads data\n", + "if it is strictly necessary, so it is, e.g., safe to always call `pull` from some\n", + "script or notebook, as nothing will be pulled if the necessary files are already present.\n", + "Even pulling with `force=True` is \"safe\", in the sense that it is fast. Using\n", + "`force=True` is a good option for making sure that the data that one uses is\n", + "the latest version from the remote." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Shutting down minio and going back to notebooks dir\n", + "\n", + "if not os.getenv(\"CI\"):\n", + " # In CI we start the minIO container separately\n", + " !docker-compose down\n", + "os.chdir(notebooks_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/pytest.ini b/pytest.ini index 0cd3e9e..322bd17 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,4 +4,4 @@ testpaths = log_format = %(asctime)s %(levelname)s %(message)s log_date_format = %Y-%m-%d %H:%M:%S log_cli=true -log_level=INFO +log_level=WARNING diff --git a/requirements-linting.txt b/requirements-linting.txt index 11223aa..2cb6533 100644 --- a/requirements-linting.txt +++ b/requirements-linting.txt @@ -1,5 +1,6 @@ -pylint -anybadge -pylint-json2html -black -isort==5.6.4 +pylint==2.11.1 +astroid==2.8.6 +anybadge==1.7.0 +pylint-json2html==0.3.0 +black~=23.1.0 +isort~=5.12.0 diff --git a/requirements-test.txt b/requirements-test.txt index 5fcfd0a..5f895c0 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -3,6 +3,6 @@ pytest-cov pytest-xdist pytest-lazy-fixture jupyter==1.0.0 -nbconvert==6.1.0 -pytest-docker==0.10.3 +nbconvert==6.3.0 +pytest-docker~=1.0.1 coverage==5.5 diff --git a/requirements.txt b/requirements.txt index ef7e5f0..513b26c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ numpy>=1.18.4 -apache-libcloud~=3.1.0 +apache-libcloud~=3.7.0 tqdm>=4.48.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 6820ced..f33bc39 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ package_dir={"": "src"}, packages=find_packages(where="src"), include_package_data=True, - version="0.3.4", + version="0.4.0", description="Utils for accessing data from anywhere", install_requires=open("requirements.txt").readlines(), setup_requires=["wheel"], diff --git a/src/accsr/__init__.py b/src/accsr/__init__.py index 334b899..6a9beea 100644 --- a/src/accsr/__init__.py +++ b/src/accsr/__init__.py @@ -1 +1 @@ -__version__ = "0.3.4" +__version__ = "0.4.0" diff --git a/src/accsr/remote_storage.py b/src/accsr/remote_storage.py index c96bb70..9e1f726 100644 --- a/src/accsr/remote_storage.py +++ b/src/accsr/remote_storage.py @@ -1,24 +1,66 @@ +import json import logging.handlers import os -from dataclasses import dataclass, field +from copy import copy +from dataclasses import asdict, dataclass, field, is_dataclass from enum import Enum +from functools import cached_property from pathlib import Path -from typing import List, Optional, Pattern, Protocol +from typing import ( + Dict, + List, + Optional, + Pattern, + Protocol, + Sequence, + Union, + runtime_checkable, +) import libcloud from libcloud.storage.base import Container, StorageDriver +from libcloud.storage.types import ( + ContainerAlreadyExistsError, + InvalidContainerNameError, +) +from tqdm import tqdm from accsr.files import md5sum log = logging.getLogger(__name__) +class _SummariesJSONEncoder(json.JSONEncoder): + def default(self, o): + if is_dataclass(o): + return asdict(o) + if isinstance(o, RemoteObjectProtocol): + return o.__dict__ + if isinstance(o, SyncObject): + return o.to_dict() + return str(o) + + +class _JsonReprMixin: + def to_json(self) -> str: + return json.dumps(self, indent=2, sort_keys=True, cls=_SummariesJSONEncoder) + + def __repr__(self): + return f"\n{self.__class__.__name__}: \n{self.to_json()}\n" + + class Provider(str, Enum): GOOGLE_STORAGE = "google_storage" S3 = "s3" +@runtime_checkable class RemoteObjectProtocol(Protocol): + """ + Protocol of classes that describe remote objects. Describes information about the remote object and functionality + to download the object. + """ + name: str size: int hash: int @@ -30,8 +72,265 @@ def download( pass +class SyncObject(_JsonReprMixin): + """ + Class representing the sync-status between a local path and a remote object. Is mainly used for creating + summaries and syncing within RemoteStorage and for introspection before and after push/pull transactions. + + It is not recommended creating or manipulate instances of this class outside RemoteStorage, in particular + in user code. This class forms part of the public interface because instances of it are given to users for + introspection. + """ + + def __init__( + self, + local_path: str = None, + remote_obj: RemoteObjectProtocol = None, + remote_path: str = None, + ): + self.exists_locally = False + self.local_path = None + self.set_local_path(local_path) + + if self.local_path is None and remote_obj is None: + raise ValueError( + f"Either a local path or a remote object has to be passed." + ) + + self.remote_obj = remote_obj + + if remote_path is not None: + if remote_obj is not None and remote_obj.name != remote_path: + raise ValueError( + f"Passed both remote_path and remote_obj but the paths don't agree: " + f"{remote_path} != {remote_obj.name}" + ) + self.remote_path = remote_path + else: + if remote_obj is None: + raise ValueError(f"Either remote_path or remote_obj should be not None") + self.remote_path = remote_obj.name + + self.local_size = os.path.getsize(local_path) if self.exists_locally else 0 + self.local_hash = md5sum(local_path) if self.exists_locally else None + + @property + def name(self): + return self.remote_path + + @property + def exists_on_target(self) -> bool: + """ + Getter of the exists_on_target property. Since the file exists at least once (locally or remotely), the property + is True iff the file exists on both locations + :return: True iff the file exists on both locations + """ + return self.exists_on_remote and self.exists_locally + + def set_local_path(self, path: Optional[str]): + """ + Changes the local path of the SyncObject + :param path: + :return: None + """ + if path is not None: + local_path = os.path.abspath(path) + if os.path.isdir(local_path): + raise FileExistsError( + f"local_path needs to point to file but pointed to a directory: {local_path}" + ) + self.local_path = local_path + self.exists_locally = os.path.isfile(local_path) + + @property + def exists_on_remote(self): + """ + Getter of the exists_on_remote property. Is true if the file was found on the remote. + :return: whether the file exists on the remote + """ + return self.remote_obj is not None + + @property + def equal_md5_hash_sum(self): + """ + Getter of the equal_md5_hash_sum property. + :return: True if the local and the remote file have the same md5sum + """ + if self.exists_on_target: + return self.local_hash == self.remote_obj.hash + return False + + def to_dict(self): + result = copy(self.__dict__) + result["exists_on_remote"] = self.exists_on_remote + result["exists_on_target"] = self.exists_on_target + result["equal_md5_hash_sum"] = self.equal_md5_hash_sum + return result + + +def _get_total_size(objects: Sequence[SyncObject], mode="local"): + """ + Computes the total size of the objects either on the local or on the remote side. + :param objects: The SyncObjects for which the size should be computed + :param mode: either 'local' or 'remote' + :return: the total size of the objects on the specified side + """ + permitted_modes = ["local", "remote"] + if mode not in permitted_modes: + raise ValueError(f"Unknown mode: {mode}. Has to be in {permitted_modes}.") + if len(objects) == 0: + return 0 + + def get_size(obj: SyncObject): + if mode == "local": + if not obj.exists_locally: + raise FileNotFoundError( + f"Cannot retrieve size of non-existing file: {obj.local_path}" + ) + return obj.local_size + if obj.remote_obj is None: + raise FileNotFoundError( + f"Cannot retrieve size of non-existing remote object corresponding to: {obj.local_path}" + ) + return obj.remote_obj.size + + return sum([get_size(obj) for obj in objects]) + + +@dataclass(repr=False) +class TransactionSummary(_JsonReprMixin): + """ + Class representing the summary of a push or pull operation. Is mainly used for introspection before and after + push/pull transactions. + + It is not recommended creating or manipulate instances of this class outside RemoteStorage, in particular + in user code. This class forms part of the public interface because instances of it are given to users for + introspection. + """ + + matched_source_files: List[SyncObject] = field(default_factory=list) + not_on_target: List[SyncObject] = field(default_factory=list) + on_target_eq_md5: List[SyncObject] = field(default_factory=list) + on_target_neq_md5: List[SyncObject] = field(default_factory=list) + unresolvable_collisions: Dict[str, Union[List[RemoteObjectProtocol], str]] = field( + default_factory=dict + ) + skipped_source_files: List[SyncObject] = field(default_factory=list) + + synced_files: List[SyncObject] = field(default_factory=list) + sync_direction: Optional[str] = None + + def __post_init__(self): + if self.sync_direction not in ["pull", "push", None]: + raise ValueError( + f"sync_direction can only be set to pull, push or None, instead got: {self.sync_direction}" + ) + + @property + def files_to_sync(self) -> List[SyncObject]: + """ + Returns of files that need synchronization. + + :return: list of all files that are not on the target or have different md5sums on target and remote + """ + return self.not_on_target + self.on_target_neq_md5 + + def size_files_to_sync(self) -> int: + """ + Computes the total size of all objects that need synchronization. Raises a RuntimeError if the sync_direction + property is not set to 'push' or 'pull'. + + :return: the total size of all local objects that need synchronization if self.sync_direction='push' and + the size of all remote files that need synchronization if self.sync_direction='pull' + """ + if self.sync_direction not in ["push", "pull"]: + raise RuntimeError( + "sync_direction has to be set to push or pull before computing sizes" + ) + mode = "local" if self.sync_direction == "push" else "remote" + return _get_total_size(self.files_to_sync, mode=mode) + + @property + def requires_force(self) -> bool: + """ + Getter of the requires_force property. + :return: True iff a failure of the transaction can only be prevented by setting force=True. + """ + return len(self.on_target_neq_md5) != 0 + + @property + def has_unresolvable_collisions(self) -> bool: + """ + Getter of the requires_force property. + :return: True iff there exists a collision that cannot be resolved. + """ + return len(self.unresolvable_collisions) != 0 + + @property + def all_files_analyzed(self) -> List[SyncObject]: + """ + Getter of the all_files_analyzed property. + :return: list of all analyzed source files + """ + return self.skipped_source_files + self.matched_source_files + + def add_entry( + self, + synced_object: Union[SyncObject, str], + collides_with: Optional[Union[List[RemoteObjectProtocol], str]] = None, + skip: bool = False, + ): + """ + Adds a SyncObject to the summary. + :param synced_object: either a SyncObject or a path to a local file. + :param collides_with: specification of unresolvable collisions for the given sync object + :param skip: if True, the object is marked to be skipped + :return: None + """ + if isinstance(synced_object, str): + synced_object = SyncObject(synced_object) + if skip: + self.skipped_source_files.append(synced_object) + return + + self.matched_source_files.append(synced_object) + if collides_with: + self.unresolvable_collisions[synced_object.name] = collides_with + elif synced_object.exists_on_target: + if synced_object.equal_md5_hash_sum: + self.on_target_eq_md5.append(synced_object) + else: + self.on_target_neq_md5.append(synced_object) + else: + self.not_on_target.append(synced_object) + + def get_short_summary_dict(self): + """ + Returns a short summary of the transaction as a dictionary. + """ + return { + "sync_direction": self.sync_direction, + "files_to_sync": len(self.files_to_sync), + "total_size": self.size_files_to_sync(), + "unresolvable_collisions": len(self.unresolvable_collisions), + "synced_files": len(self.synced_files), + } + + def print_short_summary(self): + """ + Prints a short summary of the transaction (shorter than the full repr, which contains + information about local and remote objects). + """ + print(json.dumps(self.get_short_summary_dict(), indent=2)) + + @dataclass class RemoteStorageConfig: + """ + Contains all necessary information to establish a connection + to a bucket within the remote storage, and the base path on the remote. + """ + provider: str key: str bucket: str @@ -40,12 +339,12 @@ class RemoteStorageConfig: host: str = None port: int = None base_path: str = "" + secure: bool = True class RemoteStorage: """ Wrapper around lib-cloud for accessing remote storage services. - :param conf: """ @@ -53,7 +352,7 @@ def __init__(self, conf: RemoteStorageConfig): self._bucket: Optional[Container] = None self._conf = conf self._provider = conf.provider - self._remote_base_path: str = None + self._remote_base_path = "" self.set_remote_base_path(conf.base_path) possible_driver_kwargs = { "key": self.conf.key, @@ -61,24 +360,45 @@ def __init__(self, conf: RemoteStorageConfig): "region": self.conf.region, "host": self.conf.host, "port": self.conf.port, + "secure": self.conf.secure, } self.driver_kwargs = { k: v for k, v in possible_driver_kwargs.items() if v is not None } + def create_bucket(self, exist_ok: bool = True): + try: + log.info( + f"Creating bucket {self.conf.bucket} from configuration {self.conf}." + ) + self.driver.create_container(container_name=self.conf.bucket) + except (ContainerAlreadyExistsError, InvalidContainerNameError): + if not exist_ok: + raise + log.info( + f"Bucket {self.conf.bucket} already exists (or the name was invalid)." + ) + @property - def conf(self): + def conf(self) -> RemoteStorageConfig: return self._conf @property - def provider(self): + def provider(self) -> str: return self._provider @property - def remote_base_path(self): + def remote_base_path(self) -> str: return self._remote_base_path def set_remote_base_path(self, path: Optional[str]): + """ + Changes the base path in the remote storage + (overriding the base path extracted from RemoteStorageConfig during instantiation). + Pull and push operations will only affect files within the remote base path. + + :param path: a path with linux-like separators + """ if path is None: path = "" else: @@ -86,29 +406,86 @@ def set_remote_base_path(self, path: Optional[str]): path = path.strip().lstrip("/") self._remote_base_path = path.strip() - @property - def bucket(self): - return self._maybe_instantiate_bucket() - - def _maybe_instantiate_bucket(self): - if self._bucket is None: - log.info(f"Establishing connection to bucket {self.conf.bucket}") - storage_driver_factory = libcloud.get_driver( - libcloud.DriverType.STORAGE, self.provider + @cached_property + def bucket(self) -> Container: + log.info(f"Establishing connection to bucket {self.conf.bucket}") + return self.driver.get_container(self.conf.bucket) + + @cached_property + def driver(self) -> StorageDriver: + storage_driver_factory = libcloud.get_driver( + libcloud.DriverType.STORAGE, self.provider + ) + return storage_driver_factory(**self.driver_kwargs) + + def execute_sync( + self, sync_object: SyncObject, direction: str, force=False + ) -> SyncObject: + """ + Synchronizes the local and the remote file in the given direction. Will raise an error if a file from the source + would overwrite an already existing file on the target and force=False. In this case, no operations will be + performed on the target. + + :param sync_object: instance of SyncObject that will be used as basis for synchronization. Usually + created from a get_*_summary method. + :param direction: either "push" or "pull" + :param force: if True, all already existing files on the target (with a different md5sum than the source files) + will be overwritten. + :return: a SyncObject that represents the status of remote and target after the synchronization + """ + if direction not in ["push", "pull"]: + raise ValueError( + f"Unknown direction {direction}, has to be either 'push' or 'pull'." + ) + if sync_object.equal_md5_hash_sum: + log.debug( + f"Skipping {direction} of {sync_object.name} because of coinciding hash sums" + ) + return sync_object + + if sync_object.exists_on_target and not force: + raise ValueError( + f"Cannot perform {direction} because {sync_object.name} already exists and force is False" ) - driver: StorageDriver = storage_driver_factory(**self.driver_kwargs) - self._bucket: Container = driver.get_container(self.conf.bucket) - return self._bucket + + if direction == "push": + if not sync_object.exists_locally: + raise FileNotFoundError( + f"Cannot push non-existing file: {sync_object.local_path}" + ) + remote_obj = self.bucket.upload_object( + sync_object.local_path, + sync_object.remote_path, + verify_hash=False, + ) + return SyncObject(sync_object.local_path, remote_obj) + + elif direction == "pull": + if None in [sync_object.remote_obj, sync_object.local_path]: + raise RuntimeError( + f"Cannot pull without remote object and local path. Affects: {sync_object.name}" + ) + if os.path.isdir(sync_object.local_path): + raise FileExistsError( + f"Cannot pull file to a path which is an existing directory: {sync_object.local_path}" + ) + + log.debug(f"Fetching {sync_object.remote_obj.name} from {self.bucket.name}") + os.makedirs(os.path.dirname(sync_object.local_path), exist_ok=True) + sync_object.remote_obj.download( + sync_object.local_path, overwrite_existing=force + ) + return SyncObject(sync_object.local_path, sync_object.remote_obj) @staticmethod - def _get_remote_path(remote_obj: RemoteObjectProtocol): + def _get_remote_path(remote_obj: RemoteObjectProtocol) -> str: """ Returns the full path to the remote object. The resulting path never starts with "/" as it can cause problems with some backends (e.g. google cloud storage). """ return remote_obj.name.lstrip("/") - def _get_relative_remote_path(self, remote_obj: RemoteObjectProtocol): + def _get_relative_remote_path(self, remote_obj: RemoteObjectProtocol) -> str: """ Returns the path to the remote object relative to configured base dir (as expected by pull for a single file) """ @@ -117,41 +494,7 @@ def _get_relative_remote_path(self, remote_obj: RemoteObjectProtocol): result = result.lstrip("/") return result - def _pull_object( - self, - remote_object: RemoteObjectProtocol, - destination_path: str, - overwrite_existing=False, - ) -> bool: - """ - Download the remote object to the destination path. Returns True if file was downloaded, else False - """ - - destination_path = os.path.abspath(destination_path) - if os.path.isdir(destination_path): - raise FileExistsError( - f"Cannot pull file to a path which is an existing directory: {destination_path}" - ) - - if os.path.isfile(destination_path): - if not overwrite_existing: - log.debug( - f"Not downloading {remote_object.name} since target file already exists:" - f" {os.path.abspath(destination_path)}. Set overwrite_existing to True to force the download" - ) - return False - if md5sum(destination_path) == remote_object.hash: - log.debug( - f"File {destination_path} is identical to the pulled file, not downloading again" - ) - return False - - log.debug(f"Fetching {remote_object.name} from {self.bucket.name}") - os.makedirs(os.path.dirname(destination_path), exist_ok=True) - remote_object.download(destination_path, overwrite_existing=overwrite_existing) - return True - - def _full_remote_path(self, remote_path: str): + def _full_remote_path(self, remote_path: str) -> str: """ :param remote_path: remote_path on storage bucket relative to the configured remote base remote_path. e.g. 'data/some_file.json' @@ -165,7 +508,7 @@ def _full_remote_path(self, remote_path: str): @staticmethod def _listed_due_to_name_collision( full_remote_path: str, remote_object: RemoteObjectProtocol - ): + ) -> bool: """ Checks whether a remote object was falsely listed because its name starts with the same characters as full_remote_path. @@ -188,87 +531,165 @@ def _listed_due_to_name_collision( is_selected_file = object_remote_path == full_remote_path return not (is_in_selected_dir or is_selected_file) + def _execute_sync_from_summary( + self, summary: TransactionSummary, dryrun: bool = False, force: bool = False + ) -> TransactionSummary: + """ + Executes a transaction summary. + :param summary: The transaction summary + :param dryrun: if True, logs any error that would have prevented the execution and returns the summary + without actually executing the sync. + :param force: raises an error if dryrun=False and any files would be overwritten by the sync + :return: Returns the input transaction summary. Note that the function potentially alters the state of the + input summary. + """ + if dryrun: + log.info(f"Skipping {summary.sync_direction} because dryrun=True") + if summary.has_unresolvable_collisions: + log.warning( + f"This transaction has unresolvable collisions and would not succeed." + ) + if summary.requires_force and not force: + log.warning( + f"This transaction requires overwriting of files and would not succeed without force=True" + ) + return summary + + if summary.has_unresolvable_collisions: + raise FileExistsError( + f"Found name collisions files with directories, not syncing anything. " + f"Suggestion: perform a dryrun and analyze the summary. " + f"Affected names: {list(summary.unresolvable_collisions.keys())}. " + ) + + if summary.requires_force and not force: + raise FileExistsError( + f"Operation requires overwriting of objects but force=False" + f"Suggestion: perform a dryrun and analyze the summary. " + f"Affected names: {[obj.name for obj in summary.on_target_neq_md5]}. " + ) + + with tqdm(total=summary.size_files_to_sync(), desc="Progress (Bytes)") as pbar: + for sync_obj in summary.files_to_sync: + synced_obj = self.execute_sync( + sync_obj, direction=summary.sync_direction, force=force + ) + pbar.update(synced_obj.local_size) + summary.synced_files.append(synced_obj) + return summary + def pull( self, remote_path: str, local_base_dir="", - overwrite_existing=False, + force=False, path_regex: Pattern = None, convert_to_linux_path=True, - ) -> List[RemoteObjectProtocol]: + dryrun=False, + ) -> TransactionSummary: r""" - Pull either a file or a directory under the given path relative to local_base_dir. Files with the same name - as locally already existing ones will not be downloaded anything unless overwrite_existing is True + Pull either a file or a directory under the given path relative to local_base_dir. :param remote_path: remote path on storage bucket relative to the configured remote base path. e.g. 'data/ground_truth/some_file.json' :param local_base_dir: Local base directory for constructing local path e.g passing 'local_base_dir' will download to the path 'local_base_dir/data/ground_truth/some_file.json' in the above example - :param overwrite_existing: Overwrite file if exists locally + :param force: If False, pull will raise an error if an already existing file deviates from the remote in + its md5sum. If True, these files are overwritten. :param path_regex: If not None only files with paths matching the regex will be pulled. This is useful for filtering files within a remote directory before pulling them. :param convert_to_linux_path: if True, will convert windows path to linux path (as needed by remote storage) and thus passing a remote path like 'data\my\path' will be converted to 'data/my/path' before pulling. This should only be set to False if you want to pull a remote object with '\' in its file name (which is discouraged). - :return: list of objects referring to all downloaded files + :param dryrun: If True, simulates the pull operation and returns the remote objects that would have been pulled. + :return: An object describing the summary of the operation. + """ + summary = self.get_pull_summary( + remote_path, + local_base_dir, + path_regex, + convert_to_linux_path=convert_to_linux_path, + ) + if len(summary.all_files_analyzed) == 0: + log.warning(f"No files found in remote storage under path: {remote_path}") + return self._execute_sync_from_summary(summary, dryrun=dryrun, force=force) + + def _get_destination_path( + self, obj: RemoteObjectProtocol, local_base_dir: str + ) -> str: + """ + Return the destination path of the given object + """ + relative_obj_path = self._get_relative_remote_path(obj) + return os.path.join(local_base_dir, relative_obj_path) + + def get_pull_summary( + self, + remote_path: str, + local_base_dir="", + path_regex: Pattern = None, + convert_to_linux_path=True, + ) -> TransactionSummary: + r""" + Creates TransactionSummary of the specified pull operation. + + :param remote_path: remote path on storage bucket relative to the configured remote base path. + e.g. 'data/ground_truth/some_file.json' + :param local_base_dir: Local base directory for constructing local path. + Example: passing 'local_base_dir' will download to the path + 'local_base_dir/data/ground_truth/some_file.json' in the above example + :param path_regex: If not None only files with paths matching the regex will be pulled. This is useful for + filtering files within a remote directory before pulling them. + :param convert_to_linux_path: if True, will convert windows path to linux path (as needed by remote storage) and + thus passing a remote path like 'data\my\path' will be converted to 'data/my/path' before pulling. + This should only be set to False if you want to pull a remote object with '\' in its file name + (which is discouraged). + :return: """ local_base_dir = os.path.abspath(local_base_dir) if convert_to_linux_path: remote_path = remote_path.replace("\\", "/") + + summary = TransactionSummary(sync_direction="pull") full_remote_path = self._full_remote_path(remote_path) remote_objects: List[RemoteObjectProtocol] = list( self.bucket.list_objects(full_remote_path) ) - if len(remote_objects) == 0: - log.warning( - f"No such remote file or directory: {full_remote_path}. Not pulling anything" - ) - return [] - - def maybe_get_destination_path(obj: RemoteObjectProtocol): - # Due to a possible bug in libcloud or storage providers, directories may be listed in remote objects. - # We filter them out by checking for size - if obj.size == 0: - log.info(f"Skipping download of {obj.name} with size zero.") - return - if self._listed_due_to_name_collision(full_remote_path, obj): + for obj in tqdm(remote_objects, desc="Remote paths: "): + local_path = None + collides_with = None + skip = False + if (obj.size == 0) or ( + self._listed_due_to_name_collision(full_remote_path, obj) + ): log.debug( - f"Skipping download of {obj.name}. " - f"It was listed due to name collision and should not be pulled" + f"Skipping {obj.name} since it was listed due to name collisions" ) - return - - relative_obj_path = self._get_relative_remote_path(obj) - if path_regex is not None: + skip = True + elif path_regex is not None: + relative_obj_path = self._get_relative_remote_path(obj) if not path_regex.match(relative_obj_path): - log.info(f"Skipping {relative_obj_path} due to regex {path_regex}") - return - return os.path.join(local_base_dir, relative_obj_path) + log.debug(f"Skipping {relative_obj_path} due to regex {path_regex}") + skip = True - downloaded_objects = [] - for remote_obj in remote_objects: - destination_path = maybe_get_destination_path(remote_obj) - if destination_path is None: - continue + if not skip: + local_path = self._get_destination_path(obj, local_base_dir) + if os.path.isdir(local_path): + collides_with = local_path - was_downloaded = self._pull_object( - remote_obj, - destination_path, - overwrite_existing=overwrite_existing, + summary.add_entry( + SyncObject(local_path, obj), skip=skip, collides_with=collides_with ) - if was_downloaded: - downloaded_objects.append(remote_obj) - return downloaded_objects + return summary @staticmethod def _get_push_local_path(path: str, local_path_prefix: Optional[str] = None) -> str: """ Get the full local path of a file for pushing, including an optional path prefix. - Note that ``path`` may not be absolute if ``local_path_prefix`` is specified. **Usage Examples:** @@ -286,7 +707,7 @@ def _get_push_local_path(path: str, local_path_prefix: Optional[str] = None) -> :param path: :param local_path_prefix: - :return: + :return: the full local path of the file """ # Parameter validation if local_path_prefix and Path(path).is_absolute(): @@ -299,151 +720,86 @@ def _get_push_local_path(path: str, local_path_prefix: Optional[str] = None) -> else: return os.path.join(local_path_prefix or "", path) - def _get_push_remote_path(self, local_path: str) -> str: + def get_push_remote_path(self, local_path: str) -> str: """ Get the full path within a remote storage bucket for pushing. - :param local_path: - :return: + :param local_path: the local path to the file + :return: the remote path that corresponds to the local path """ return "/".join([self.remote_base_path, local_path]).replace(os.sep, "/") - def push_directory( + def get_push_summary( self, path: str, local_path_prefix: Optional[str] = None, - overwrite_existing=True, - path_regex: Pattern = None, - ) -> List[RemoteObjectProtocol]: + path_regex: Optional[Pattern] = None, + ) -> TransactionSummary: """ - Upload a directory from the given local path into the remote storage. Does not upload files for which the md5sum - matches existing remote files. - The remote path to which the directory is uploaded will be constructed from the remote_base_path and the - provided path. The local_path_prefix serves for finding the directory on the local system. + Retrieves the summary of the push-transaction plan, before it has been executed. + Nothing will be pushed and the synced_files entry of the summary will be an empty list. - Note: This method does not delete any remote objects within the directory where paths did not match the local - paths, even if overwrite_existing is true - - Examples: - 1) path=foo/bar, local_path_prefix=None --> - ./foo/bar uploaded to remote_base_path/foo/bar - 2) path=/home/foo/bar, local_path_prefix=None --> - /home/foo/bar uploaded to remote_base_path/home/foo/bar - 3) path=bar, local_path_prefix=/home/foo --> - /home/foo/bar uploaded to remote_base_path/bar - - Note that ``path`` may not be absolute if ``local_path_prefix`` is specified. - - :param path: Path to the local directory to be uploaded, may be absolute or relative - :param local_path_prefix: Optional prefix for the local path - :param overwrite_existing: Whether to overwrite existing remote objects (if they have the same path but differing - md5sums). - :param path_regex: If not None only files with paths matching the regex will be pushed. - :return: A list of :class:`Object` instances for all remote objects that were created or matched existing files + :param path: Path to the local object (file or directory) to be uploaded, may be absolute or relative + :param local_path_prefix: Prefix to be concatenated with ``path`` + :param path_regex: If not None only files with paths matching the regex will be pushed + :return: the summary object """ - log.debug(f"push_object({path=}, {local_path_prefix=}, {overwrite_existing=}") - objects = [] + summary = TransactionSummary(sync_direction="push") + # collect all paths to scan local_path = self._get_push_local_path(path, local_path_prefix) - if not os.path.isdir(local_path): + if os.path.isfile(local_path): + all_files_analyzed = [local_path] + elif os.path.isdir(local_path): + all_files_analyzed = [] + for root, _, fs in os.walk(local_path): + all_files_analyzed.extend([os.path.join(root, f) for f in fs]) + else: raise FileNotFoundError( - f"Local path {local_path} does not refer to a directory" + f"Local path {local_path} does not refer to a file or directory" ) - for root, _, files in os.walk(local_path): - log.debug(f"Root directory: {root}") - log.debug(f"Files: {files}") - rel_root_path = os.path.relpath(local_path, root) - - root_path = Path(root) - for file in files: - if path_regex is not None: - remote_obj_path = os.path.join(rel_root_path, file) - if not path_regex.match(remote_obj_path): - log.info( - f"Skipping {remote_obj_path} due to regex {path_regex}" - ) - continue - - log.debug(f"Upload: {file=}, {root_path=}") - obj = self.push_file( - file, root_path, overwrite_existing=overwrite_existing + for file in tqdm(all_files_analyzed, desc="Scanning file: "): + skip = False + collides_with = None + remote_obj = None + if path_regex is not None and not path_regex.match(file): + log.debug( + f"Skipping {file} since it does not match regular expression '{path_regex}'." ) - objects.append(obj) - return objects - - def push_file( - self, - path: str, - local_path_prefix: Optional[str] = None, - overwrite_existing=True, - ) -> Optional[RemoteObjectProtocol]: - """ - Upload a local file into the remote storage. If the md5sum of the file matches an existing remote file, - nothing will be uploaded. - The remote path to which the file is uploaded will be constructed from the remote_base_path and the provided - path. The local_path_prefix serves for finding the file on the local system. - - Examples: - 1) path=foo/bar.json, local_path_prefix=None --> - ./foo/bar.json uploaded to remote_base_path/foo/bar.json - 2) path=/home/foo/bar.json, local_path_prefix=None --> - /home/foo/bar.json uploaded to remote_base_path/home/foo/bar.json - 3) path=bar.json, local_path_prefix=/home/foo --> - /home/foo/bar.json uploaded to remote_base_path/bar.json - - Note that ``path`` may not be absolute if ``local_path_prefix`` is specified. - - :param path: Path to the local file to be uploaded, must not be absolute if ``local_path_prefix`` is specified - :param local_path_prefix: Prefix to be concatenated with ``path`` - :param overwrite_existing: If the remote object already exists, overwrite it? - :return: A :class:`Object` instance referring to the remote object - """ - log.debug( - f"push_file({path=}, {local_path_prefix=}, {self.remote_base_path=}, {overwrite_existing=}" - ) - - local_path = self._get_push_local_path(path, local_path_prefix) - if not os.path.isfile(local_path): - raise FileNotFoundError(f"Local path {local_path} does not refer to a file") - remote_path = self._get_push_remote_path(local_path) - - remote_obj = [ - obj - for obj in self.bucket.list_objects(remote_path) - if not self._listed_due_to_name_collision(remote_path, obj) - ] - if len(remote_obj) > 1: - raise RuntimeError( - f"Remote path {remote_path} exists and is a directory, will not overwrite it." - f"Consider calling push_directory or push instead." + skip = True + + remote_path = self.get_push_remote_path(file) + matched_remote_obj = [ + obj + for obj in self.bucket.list_objects(remote_path) + if not self._listed_due_to_name_collision(remote_path, obj) + ] + + # name collision of local file with remote dir + if len(matched_remote_obj) > 1: + collides_with = matched_remote_obj + + elif matched_remote_obj: + remote_obj = matched_remote_obj[0] + + synced_obj = SyncObject(file, remote_obj, remote_path=remote_path) + summary.add_entry( + synced_obj, + collides_with=collides_with, + skip=skip, ) - if remote_obj and not overwrite_existing: - remote_obj = remote_obj[0] - # Skip upload if MD5 hashes match - if md5sum(local_path) == remote_obj.hash: - log.info(f"Files are identical, skipping upload") - return remote_obj - elif not overwrite_existing: - raise RuntimeError( - f"Remote object {remote_path} already exists,\n is not identical to the local file {local_path}\n " - f"and overwrite_existing=False" - ) - - log.debug(f"Uploading: {local_path} --> {remote_path}") - remote_obj = self.bucket.upload_object( - local_path, remote_path, verify_hash=False - ) - return remote_obj + return summary def push( self, path: str, local_path_prefix: Optional[str] = None, - overwrite_existing=True, + force: bool = False, path_regex: Pattern = None, - ) -> List[RemoteObjectProtocol]: + dryrun: bool = False, + ) -> TransactionSummary: """ Upload a local file or directory into the remote storage. Does not upload files for which the md5sum matches existing remote files. @@ -460,33 +816,17 @@ def push( Note that ``path`` may not be absolute if ``local_path_prefix`` is specified. - Remote objects will not be overwritten if their MD5sum matches the local file. - :param path: Path to the local object (file or directory) to be uploaded, may be absolute or relative :param local_path_prefix: Prefix to be concatenated with ``path`` - :param overwrite_existing: If a remote object already exists, overwrite it? + :param force: If False, push will raise an error if an already existing remote file deviates from the local + in its md5sum. If True, these files are overwritten. :param path_regex: If not None only files with paths matching the regex will be pushed - :return: A list of :class:`Object` instances for all remote objects that were created or matched existing files + :param dryrun: If True, simulates the push operation and returns the summary + (with synced_files being an empty list). Same as get_push_summary method. + :return: An object describing the summary of the operation. """ - local_path = self._get_push_local_path(path, local_path_prefix) - if os.path.isfile(local_path): - - if path_regex is not None and not path_regex.match(path): - log.warning( - f"{path} does not match regular expression '{path_regex}'. Nothing is pushed." - ) - return [] - - return [self.push_file(path, local_path_prefix, overwrite_existing)] - - elif os.path.isdir(local_path): - return self.push_directory( - path, local_path_prefix, overwrite_existing, path_regex=path_regex - ) - else: - raise FileNotFoundError( - f"Local path {local_path} does not refer to a file or directory" - ) + summary = self.get_push_summary(path, local_path_prefix, path_regex) + return self._execute_sync_from_summary(summary, dryrun=dryrun, force=force) def delete( self, @@ -527,7 +867,7 @@ def delete( deleted_objects.append(remote_obj) return deleted_objects - def list_objects(self, remote_path) -> List[RemoteObjectProtocol]: + def list_objects(self, remote_path: str) -> List[RemoteObjectProtocol]: """ :param remote_path: remote path on storage bucket relative to the configured remote base path. :return: list of remote objects under the remote path (multiple entries if the remote path is a directory) diff --git a/tests/accsr/conftest.py b/tests/accsr/conftest.py index 46631c6..844af15 100644 --- a/tests/accsr/conftest.py +++ b/tests/accsr/conftest.py @@ -1,15 +1,11 @@ +import logging import os from typing import Tuple from urllib.parse import urljoin import pytest import requests -from libcloud.storage.providers import get_driver -from libcloud.storage.types import ( - ContainerAlreadyExistsError, - InvalidContainerNameError, -) -from pytest_docker.plugin import get_docker_services +from pytest_docker.plugin import Services, get_docker_services from requests.exceptions import ConnectionError from accsr.remote_storage import RemoteStorage, RemoteStorageConfig @@ -31,14 +27,26 @@ def running_on_ci() -> bool: @pytest.fixture(scope="session") def docker_services( - docker_compose_file, docker_compose_project_name, docker_cleanup, running_on_ci + docker_compose_command, + docker_compose_file, + docker_compose_project_name, + docker_setup, + docker_cleanup, + running_on_ci, ): """This overwrites pytest-docker's docker_services fixture to avoid starting containers on CI""" if running_on_ci: yield else: + logging.info( + f"Starting minio inside a docker container for remote storage tests" + ) with get_docker_services( - docker_compose_file, docker_compose_project_name, docker_cleanup + docker_compose_command, + docker_compose_file, + docker_compose_project_name, + docker_setup, + docker_cleanup, ) as docker_service: yield docker_service @@ -50,16 +58,37 @@ def docker_compose_file(pytestconfig): ) +def port_for_windows_fix( + docker_services: Services, service: str, container_port: int +) -> int: + """This is a workaround for the port_for function not working on windows""" + output = docker_services._docker_compose.execute( + "port %s %d" % (service, container_port) + ) + endpoint = output.strip().decode("utf-8") + # This handles messy output that might contain warnings or other text + endpoint_parts = endpoint.split("\r\n") + if len(endpoint_parts) > 1: + endpoint = endpoint_parts[0] + # Usually, the IP address here is 0.0.0.0, so we don't use it. + match = int(endpoint.split(":", 1)[1]) + return match + + @pytest.fixture(scope="session") def remote_storage_server(running_on_ci, docker_ip, docker_services) -> Tuple[str, int]: """Starts minio container and makes sure it is reachable. The containers will not be created on CI.""" - # Skips starting the container if we running on Gitlab CI or Github Actions + # Skips starting the container if running in CI if running_on_ci: return "remote-storage", 9000 # `port_for` takes a container port and returns the corresponding host port - port = docker_services.port_for("remote-storage", 9000) - url = "http://{}:{}".format(docker_ip, port) + if os.name == "nt": + # port_for doesn't work on windows + port = port_for_windows_fix(docker_services, "remote-storage", 9000) + else: + port = docker_services.port_for("remote-storage", 9000) + url = f"http://{docker_ip}:{port}" def is_minio_responsive(url): url = urljoin(url, "minio/health/live") @@ -78,43 +107,27 @@ def is_minio_responsive(url): @pytest.fixture(scope="session") def remote_storage_config(running_on_ci, remote_storage_server) -> RemoteStorageConfig: + if running_on_ci: + host = "remote-storage" + port = "9000" + else: + host = remote_storage_server[0] + port = remote_storage_server[1] config = RemoteStorageConfig( - key="minio", - secret="minio123", + provider="s3", + key="minio-root-user", + secret="minio-root-password", bucket="accsr-integration-tests", base_path="", - provider="s3", - ) - if running_on_ci: - config.host = "remote-storage" - config.port = "9000" - else: - config.host = remote_storage_server[0] - config.port = remote_storage_server[1] - return config - - -@pytest.fixture(scope="module") -def create_bucket(remote_storage_config, remote_storage_server): - # create bucket if it doesn't exist already - storage_driver_factory = get_driver(remote_storage_config.provider) - driver = storage_driver_factory( - key=remote_storage_config.key, - secret=remote_storage_config.secret, - host=remote_storage_config.host, - port=remote_storage_config.port, + host=host, + port=port, secure=False, ) - try: - driver.create_container(container_name=remote_storage_config.bucket) - except (ContainerAlreadyExistsError, InvalidContainerNameError): - pass + return config @pytest.fixture() -def storage(remote_storage_config, remote_storage_server, create_bucket): +def storage(remote_storage_config, remote_storage_server): storage = RemoteStorage(remote_storage_config) - # This has to be set here unless we want to set up certificates for this - # TODO: determine whether we should add this to possible_driver_kwargs or not? - storage.driver_kwargs["secure"] = False + storage.create_bucket() return storage diff --git a/tests/accsr/docker-compose.yaml b/tests/accsr/docker-compose.yaml index 9d195a1..2d21818 100644 --- a/tests/accsr/docker-compose.yaml +++ b/tests/accsr/docker-compose.yaml @@ -1,9 +1,12 @@ version: '3' + services: remote-storage: image: bitnami/minio:latest ports: - - "9000" + - "9000:9000" + - "9001:9001" environment: - MINIO_ACCESS_KEY: minio - MINIO_SECRET_KEY: minio123 + MINIO_ROOT_USER: minio-root-user + MINIO_ROOT_PASSWORD: minio-root-password + MINIO_DEFAULT_BUCKETS: accsr-integration-tests diff --git a/tests/accsr/test_conversion.py b/tests/accsr/test_conversion.py index faab9e6..0e88cc7 100644 --- a/tests/accsr/test_conversion.py +++ b/tests/accsr/test_conversion.py @@ -11,7 +11,6 @@ (np.array([[1, 2], [3, 4]]), [[1, 2], [3, 4]]), (np.array([1.0, 2.0, 3.0]), [1.0, 2.0, 3.0]), (np.int64(10), 10), - (np.float(10.5), 10.5), (np.float32(10.5), 10.5), (np.float64(10.5), 10.5), (np.int32(1), 1), diff --git a/tests/accsr/test_loading.py b/tests/accsr/test_loading.py index 8dd3757..12e5233 100644 --- a/tests/accsr/test_loading.py +++ b/tests/accsr/test_loading.py @@ -33,8 +33,10 @@ def test_open_file_in_tar_content_correct( actual_content = buffer_reader.read() with open(os.path.join(test_resources, expected_content), "rb") as file: expected_content = file.read() + # Windows... + # expected_content = expected_content.replace(br"\r\n", br"\n") - assert expected_content == actual_content + assert str(expected_content) == str(actual_content) @mark.parametrize( diff --git a/tests/accsr/test_remote_storage.py b/tests/accsr/test_remote_storage.py index 790457e..df52fb7 100644 --- a/tests/accsr/test_remote_storage.py +++ b/tests/accsr/test_remote_storage.py @@ -18,7 +18,7 @@ def test_filename( ) -> Generator[str, None, None]: """Pushes a file to remote storage, yields its filename and then deletes it from remote storage""" filename = request.param - storage.push_file(filename) + storage.push(filename) yield filename storage.delete(filename) @@ -43,7 +43,7 @@ def test_dirname( ) -> Generator[str, None, None]: """Pushes a directory to remote storage, yields its name and then deletes it from remote storage""" dirname = request.param - storage.push_directory(dirname) + storage.push(dirname) yield dirname storage.delete(dirname) @@ -56,7 +56,7 @@ def test_delete_no_matches(storage, caplog): def test_delete_file(storage): - storage.push_file("sample.txt", overwrite_existing=True) + storage.push("sample.txt", force=True) assert len(storage.list_objects("sample.txt")) == 1 deleted_objects = storage.delete("sample.txt") assert len(deleted_objects) == 1 @@ -66,7 +66,7 @@ def test_delete_file(storage): def test_delete_with_base_path(storage): base_path = "base_path" storage.set_remote_base_path(base_path) - storage.push_file("sample.txt", overwrite_existing=True) + storage.push("sample.txt", force=True) assert len(storage.list_objects("sample.txt")) == 1 deleted_objects = storage.delete("sample.txt") assert len(deleted_objects) == 1 @@ -74,7 +74,7 @@ def test_delete_with_base_path(storage): def test_delete_dir(storage): - storage.push_directory("sample_dir", overwrite_existing=True) + storage.push("sample_dir", force=True) assert len(storage.list_objects("sample_dir")) == 2 deleted_objects = storage.delete("sample_dir") assert len(deleted_objects) == 2 @@ -87,10 +87,10 @@ def test_delete_dir(storage): indirect=["test_filename"], ) def test_push_file_empty_base_path(storage, test_filename): - remote_objects = storage.push(test_filename) - assert len(remote_objects) == 1 + push_summary = storage.push(test_filename) + assert len(push_summary.synced_files) == 1 # we need lstrip because s3 paths (and names) start with "/" while google storage paths start without it... - assert remote_objects[0].name.lstrip("/") == test_filename + assert push_summary.synced_files[0].name.lstrip("/") == test_filename @pytest.mark.parametrize( @@ -101,9 +101,11 @@ def test_push_file_empty_base_path(storage, test_filename): def test_push_file_nonempty_base_path(storage, test_filename): base_path = "base_path" storage.set_remote_base_path(base_path) - remote_objects = storage.push(test_filename) - assert len(remote_objects) == 1 - assert remote_objects[0].name.lstrip("/") == f"{base_path}/{test_filename}" + push_summary = storage.push(test_filename) + assert len(push_summary.synced_files) == 1 + assert ( + push_summary.synced_files[0].name.lstrip("/") == f"{base_path}/{test_filename}" + ) @pytest.mark.parametrize( @@ -112,8 +114,8 @@ def test_push_file_nonempty_base_path(storage, test_filename): indirect=["test_dirname"], ) def test_push_directory(storage, test_dirname): - remote_objects = storage.push(test_dirname) - assert len(remote_objects) == 2 + push_summary = storage.push(test_dirname) + assert len(push_summary.synced_files) == 2 assert len(storage.list_objects(test_dirname)) == 2 @@ -136,8 +138,8 @@ def test_pull_file(storage, test_filename, tmpdir): local_base_dir = tmpdir.mkdir("remote_storage") storage.pull(test_filename, local_base_dir=local_base_dir) assert os.path.isfile(os.path.join(local_base_dir, test_filename)) - pulled_files = storage.pull(test_filename) - assert len(pulled_files) == 0 + pull_summary = storage.pull(test_filename, force=False) + assert len(pull_summary.synced_files) == 0 @pytest.mark.parametrize( @@ -150,7 +152,7 @@ def test_pull_file_to_existing_dir_path(storage, test_filename, tmpdir): local_base_dir.mkdir(test_filename) with pytest.raises( FileExistsError, - match="Cannot pull file to a path which is an existing directory:", + match=r".*directory:.*", ): storage.pull(test_filename, local_base_dir=local_base_dir) @@ -165,8 +167,8 @@ def test_pull_dir(storage, test_dirname, tmpdir): storage.pull(test_dirname, local_base_dir=local_base_dir) assert os.path.isdir(os.path.join(local_base_dir, test_dirname)) assert len(os.listdir(os.path.join(local_base_dir, test_dirname))) == 2 - pulled_files = storage.pull(test_dirname) - assert len(pulled_files) == 0 + pull_summary = storage.pull(test_dirname, force=False) + assert len(pull_summary.synced_files) == 0 @pytest.mark.parametrize( @@ -174,9 +176,9 @@ def test_pull_dir(storage, test_dirname, tmpdir): ) def test_pull_non_existing(storage, file_or_dir_name, caplog): with caplog.at_level(logging.WARNING): - pulled_files = storage.pull(file_or_dir_name) - assert len(pulled_files) == 0 - assert "No such remote file or directory" in caplog.text + pull_summary = storage.pull(file_or_dir_name) + assert len(pull_summary.synced_files) == 0 + assert "No files found in remote storage under path:" in caplog.text def test_name_collisions_pulling_properly(setup_name_collision, storage, tmpdir): diff --git a/tox.ini b/tox.ini index 7c0b07d..689f17d 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ commands = deps = -rrequirements-test.txt -rrequirements.txt -whitelist_externals = +allowlist_externals = coverage pytest @@ -38,7 +38,7 @@ commands = bash build_scripts/build-linting-report.sh deps = -rrequirements-linting.txt -whitelist_externals = +allowlist_externals = bash [testenv:docs] @@ -49,7 +49,7 @@ commands = bash build_scripts/build-docs.sh deps = -rrequirements-docs.txt -whitelist_externals = +allowlist_externals = bash [testenv:report] @@ -58,6 +58,6 @@ commands = bash build_scripts/build-coverage-report.sh deps = -rrequirements-coverage.txt -whitelist_externals = +allowlist_externals = bash