Skip to content

Commit

Permalink
Merge pull request #108 from bento-platform/feat/delete-drs-object
Browse files Browse the repository at this point in the history
feat: DRS object deletion
  • Loading branch information
davidlougheed authored Apr 17, 2024
2 parents 54c75a9 + 082cb14 commit 236307f
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 181 deletions.
2 changes: 1 addition & 1 deletion chord_drs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def _get_backend() -> Backend | None:
# Instantiate backend if needed
backend_class = DATA_SOURCE_BACKENDS.get(current_app.config["SERVICE_DATA_SOURCE"])
return backend_class() if backend_class else None
return backend_class(current_app.config) if backend_class else None


def get_backend() -> Backend | None:
Expand Down
18 changes: 9 additions & 9 deletions chord_drs/backends/base.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from abc import ABC, abstractmethod


__all__ = ["Backend", "FakeBackend"]
__all__ = ["Backend"]


# noinspection PyUnusedLocal
class Backend(ABC):
@abstractmethod
def save(self, current_location: str, filename: str) -> str: # pragma: no cover
def __init__(self, config: dict): # pragma: no cover
pass

@abstractmethod
def save(self, current_location: str, filename: str) -> str: # pragma: no cover
pass

class FakeBackend(Backend):
"""
For the tests
"""

def save(self, current_location: str, filename: str) -> str:
return current_location
@abstractmethod
def delete(self, location: str) -> None: # pragma: no cover
pass
14 changes: 10 additions & 4 deletions chord_drs/backends/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from shutil import copy
from flask import current_app
from pathlib import Path

from .base import Backend
Expand All @@ -15,12 +14,19 @@ class LocalBackend(Backend):
specified by the DATA var env, the default being in ~/chord_drs_data
"""

def __init__(self):
self.base_location = Path(current_app.config["SERVICE_DATA"])
def __init__(self, config: dict): # config is dict or flask.Config, which is a subclass of dict.
self.base_location = Path(config["SERVICE_DATA"])
# We can use mkdir, since resolve has been called in config.py
self.base_location.mkdir(exist_ok=True)
self.base_location.mkdir(parents=True, exist_ok=True)

def save(self, current_location: str | Path, filename: str) -> str:
new_location = self.base_location / filename
copy(current_location, new_location)
return str(new_location.resolve())

def delete(self, location: str | Path) -> None:
loc = location if isinstance(location, Path) else Path(location)
if self.base_location in loc.parents:
loc.unlink()
return
raise ValueError(f"Location {loc} is not a subpath of backend base location {self.base_location}")
30 changes: 18 additions & 12 deletions chord_drs/backends/minio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import boto3

from flask import current_app
from urllib.parse import urlparse

from .base import Backend
Expand All @@ -10,26 +9,33 @@


class MinioBackend(Backend):
def __init__(self, resource=None):
def __init__(self, config: dict, resource=None): # config is dict or flask.Config, which is a subclass of dict.
self._minio_url = config["MINIO_URL"]

self.minio = resource or boto3.resource(
"s3",
endpoint_url=current_app.config["MINIO_URL"],
aws_access_key_id=current_app.config["MINIO_USERNAME"],
aws_secret_access_key=current_app.config["MINIO_PASSWORD"],
endpoint_url=self._minio_url,
aws_access_key_id=config["MINIO_USERNAME"],
aws_secret_access_key=config["MINIO_PASSWORD"],
)

self.bucket = self.minio.Bucket(current_app.config["MINIO_BUCKET"])
self.bucket = self.minio.Bucket(config["MINIO_BUCKET"])

@staticmethod
def build_minio_location(obj):
host = urlparse(current_app.config["MINIO_URL"]).netloc
def build_minio_location(self, obj):
host = urlparse(self._minio_url).netloc
return f"s3://{host}/{obj.bucket_name}/{obj.key}"

def get_minio_object(self, location: str):
obj = self.bucket.Object(location.split("/")[-1])
return obj.get()
return self.bucket.Object(location.split("/")[-1])

def get_minio_object_dict(self, location: str) -> dict:
return self.get_minio_object(location).get()

def save(self, current_location: str, filename: str) -> str:
with open(current_location, "rb") as f:
obj = self.bucket.put_object(Key=filename, Body=f)
return MinioBackend.build_minio_location(obj)
return self.build_minio_location(obj)

def delete(self, location: str) -> None:
obj = self.get_minio_object(location)
obj.delete()
4 changes: 2 additions & 2 deletions chord_drs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(self, *args, **kwargs):

super().__init__(*args, **kwargs)

def return_minio_object(self):
def return_minio_object(self) -> dict:
parsed_url = urlparse(self.location)

if parsed_url.scheme != "s3":
Expand All @@ -139,4 +139,4 @@ def return_minio_object(self):
if not backend or not isinstance(backend, MinioBackend):
raise Exception("The backend for this instance is not properly configured.")

return backend.get_minio_object(self.location)
return backend.get_minio_object_dict(self.location)
180 changes: 37 additions & 143 deletions chord_drs/routes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import os
import re
import tempfile
import urllib.parse

from asgiref.sync import async_to_sync
from bento_lib.auth.permissions import Permission, P_INGEST_DATA, P_QUERY_DATA, P_DOWNLOAD_DATA
from bento_lib.auth.permissions import Permission, P_INGEST_DATA, P_QUERY_DATA, P_DELETE_DATA, P_DOWNLOAD_DATA
from bento_lib.auth.resources import RESOURCE_EVERYTHING, build_resource
from bento_lib.service_info.constants import SERVICE_ORGANIZATION_C3G
from bento_lib.service_info.helpers import build_service_info
Expand All @@ -13,22 +14,20 @@
Request,
current_app,
jsonify,
url_for,
request,
send_file,
make_response,
)
from sqlalchemy import or_
from urllib.parse import urlparse
from werkzeug.exceptions import BadRequest, Forbidden, NotFound, InternalServerError, RequestedRangeNotSatisfiable

from . import __version__
from .authz import authz_middleware
from .backend import get_backend
from .constants import BENTO_SERVICE_KIND, SERVICE_NAME, SERVICE_TYPE
from .data_sources import DATA_SOURCE_LOCAL, DATA_SOURCE_MINIO
from .db import db
from .models import DrsMixin, DrsBlob, DrsBundle
from .types import DRSAccessMethodDict, DRSContentsDict, DRSObjectBentoDict, DRSObjectDict
from .models import DrsBlob, DrsBundle
from .serialization import build_bundle_json, build_blob_json
from .utils import drs_file_checksum


Expand Down Expand Up @@ -84,18 +83,18 @@ def _post_headers_getter(r: Request) -> dict[str, str]:


def fetch_and_check_object_permissions(object_id: str, permission: Permission) -> tuple[DrsBlob | DrsBundle, bool]:
view_data_everything = check_everything_permission(permission)
has_permission_on_everything = check_everything_permission(permission)

drs_object, is_bundle = get_drs_object(object_id)

if not drs_object:
authz_middleware.mark_authz_done(request)
if authz_enabled() and not view_data_everything: # Don't leak if this object exists
if authz_enabled() and not has_permission_on_everything: # Don't leak if this object exists
raise forbidden()
raise NotFound("No object found for this ID")

# Check permissions -------------------------------------------------
if view_data_everything:
if has_permission_on_everything:
# Good to go already!
authz_middleware.mark_authz_done(request)
else:
Expand All @@ -119,138 +118,6 @@ def range_not_satisfiable_log_mark(description: str, length: int) -> RequestedRa
return RequestedRangeNotSatisfiable(description=description, length=length)


def get_drs_host() -> str:
return urlparse(current_app.config["SERVICE_BASE_URL"]).netloc


def create_drs_uri(object_id: str) -> str:
return f"drs://{get_drs_host()}/{object_id}"


def build_contents(bundle: DrsBundle, expand: bool) -> list[DRSContentsDict]:
content: list[DRSContentsDict] = []
bundles = DrsBundle.query.filter_by(parent_bundle=bundle).all()

for b in bundles:
content.append(
{
**({"contents": build_contents(b, expand)} if expand else {}),
"drs_uri": create_drs_uri(b.id),
"id": b.id,
"name": b.name, # TODO: Can overwrite... see spec
}
)

for c in bundle.objects:
content.append(
{
"drs_uri": create_drs_uri(c.id),
"id": c.id,
"name": c.name, # TODO: Can overwrite... see spec
}
)

return content


def build_bento_object_json(drs_object: DrsMixin) -> DRSObjectBentoDict:
return {
"bento": {
"project_id": drs_object.project_id,
"dataset_id": drs_object.dataset_id,
"data_type": drs_object.data_type,
"public": drs_object.public,
}
}


def build_bundle_json(
drs_bundle: DrsBundle,
expand: bool = False,
with_bento_properties: bool = False,
) -> DRSObjectDict:
return {
"contents": build_contents(drs_bundle, expand),
"checksums": [
{
"checksum": drs_bundle.checksum,
"type": "sha-256",
},
],
"created_time": f"{drs_bundle.created.isoformat('T')}Z",
"size": drs_bundle.size,
"name": drs_bundle.name,
# Description should be excluded if null in the database
**({"description": drs_bundle.description} if drs_bundle.description is not None else {}),
"id": drs_bundle.id,
"self_uri": create_drs_uri(drs_bundle.id),
**(build_bento_object_json(drs_bundle) if with_bento_properties else {}),
}


def build_blob_json(
drs_blob: DrsBlob,
inside_container: bool = False,
with_bento_properties: bool = False,
) -> DRSObjectDict:
data_source = current_app.config["SERVICE_DATA_SOURCE"]

blob_url: str = urllib.parse.urljoin(
current_app.config["SERVICE_BASE_URL"] + "/",
url_for("drs_service.object_download", object_id=drs_blob.id).lstrip("/"),
)

https_access_method: DRSAccessMethodDict = {
"access_url": {
# url_for external was giving weird results - build the URL by hand instead using the internal url_for
"url": blob_url,
# No headers --> auth will have to be obtained via some
# out-of-band method, or the object's contents are public. This
# will depend on how the service is deployed.
},
"type": "https",
}

access_methods: list[DRSAccessMethodDict] = [https_access_method]

if inside_container and data_source == DATA_SOURCE_LOCAL:
access_methods.append(
{
"access_url": {
"url": f"file://{drs_blob.location}",
},
"type": "file",
}
)
elif data_source == DATA_SOURCE_MINIO:
access_methods.append(
{
"access_url": {
"url": drs_blob.location,
},
"type": "s3",
}
)

return {
"access_methods": access_methods,
"checksums": [
{
"checksum": drs_blob.checksum,
"type": "sha-256",
},
],
"created_time": f"{drs_blob.created.isoformat('T')}Z",
"size": drs_blob.size,
"name": drs_blob.name,
# Description should be excluded if null in the database
**({"description": drs_blob.description} if drs_blob.description is not None else {}),
"id": drs_blob.id,
"self_uri": create_drs_uri(drs_blob.id),
**(build_bento_object_json(drs_blob) if with_bento_properties else {}),
}


@drs_service.route("/service-info", methods=["GET"])
@drs_service.route("/ga4gh/drs/v1/service-info", methods=["GET"])
@authz_middleware.deco_public_endpoint
Expand Down Expand Up @@ -288,9 +155,36 @@ def get_drs_object(object_id: str) -> tuple[DrsBlob | DrsBundle | None, bool]:
return None, False


@drs_service.route("/objects/<string:object_id>", methods=["GET"])
@drs_service.route("/ga4gh/drs/v1/objects/<string:object_id>", methods=["GET"])
def delete_drs_object(object_id: str, logger: logging.Logger):
drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_DELETE_DATA)

logger.info(f"Deleting object {drs_object.id}")

if not is_bundle:
q = DrsBlob.query.filter_by(location=drs_object.location)
n_using_file = q.count()
if n_using_file == 1 and q.first().id == drs_object.id:
# If this object is the only one using the file, delete the file too
# TODO: this can create a race condition and leave files undeleted... should we have a cleanup on start?
logger.info(
f"Deleting file at {drs_object.location}, since {drs_object.id} is the only object referring to it."
)
backend = get_backend()
backend.delete(drs_object.location)

# Don't bother with additional bundle deleting logic, they'll be removed soon anyway. TODO

db.session.delete(drs_object)
db.session.commit()


@drs_service.route("/objects/<string:object_id>", methods=["GET", "DELETE"])
@drs_service.route("/ga4gh/drs/v1/objects/<string:object_id>", methods=["GET", "DELETE"])
def object_info(object_id: str):
if request.method == "DELETE":
delete_drs_object(object_id, current_app.logger)
return current_app.response_class(status=204)

drs_object, is_bundle = fetch_and_check_object_permissions(object_id, P_QUERY_DATA)

# The requester can ask for additional, non-spec-compliant Bento properties to be included in the response
Expand Down
Loading

0 comments on commit 236307f

Please sign in to comment.