diff --git a/docs/artifact-manager.md b/docs/artifact-manager.md index be7d73ad..675830b5 100644 --- a/docs/artifact-manager.md +++ b/docs/artifact-manager.md @@ -1,16 +1,17 @@ # Artifact Manager -The `Artifact Manager` is a builtin hypha service for indexing, managing, and storing resources such as datasets, AI models, and applications. It is designed to provide a structured way to manage datasets and similar resources, enabling efficient listing, uploading, updating, and deleting of files. +The `Artifact Manager` is a built-in Hypha service for indexing, managing, and storing resources such as datasets, AI models, and applications. It provides a structured way to manage datasets and similar resources, enabling efficient listing, uploading, updating, and deleting of files. -A typical use case for the `Artifact Manager` is as a backend for a single-page web application displaying a gallery of datasets, AI models, applications or other type of resources. The default metadata of an artifact is designed to render a grid of cards on a webpage. +A typical use case for the `Artifact Manager` is as a backend for a single-page web application that displays a gallery of datasets, AI models, applications, or other types of resources. The default metadata of an artifact is designed to render a grid of cards on a webpage. + +**Note:** The `Artifact Manager` is only available when your Hypha server has S3 storage enabled. -**Note:** The `Artifact Manager` is only available when your hypha server enabled s3 storage. ## Getting Started ### Step 1: Connecting to the Artifact Manager Service -To use the `Artifact Manager`, you first need to connect to the Hypha server. This API allows you to create, read, edit, and delete datasets in the artifact registry (stored in s3 bucket for each workspace). +To use the `Artifact Manager`, you first need to connect to the Hypha server. This API allows you to create, read, edit, and delete datasets in the artifact registry (stored in a S3 bucket for each workspace). ```python from hypha_rpc.websocket_client import connect_to_server @@ -216,6 +217,18 @@ await artifact_manager.commit(prefix="collections/schema-dataset-gallery/valid-d print("Valid dataset committed.") ``` +### Step 3: Accessing the collection via HTTP API + +You can access the collection via the HTTP API to retrieve the schema and datasets. +This can be used for rendering a gallery of datasets on a webpage. + +```javascript +// Fetch the schema for the collection +fetch("https://hypha.aicell.io/my-workspace/artifact/public/collections/schema-dataset-gallery") + .then(response => response.json()) + .then(data => console.log("Schema:", data.collection_schema)); +``` + ## API Reference This section details the core functions provided by the `Artifact Manager` for creating, managing, and validating artifacts such as datasets and collections. @@ -441,3 +454,56 @@ await artifact_manager.commit(prefix="collections/dataset-gallery/example-datase datasets = await artifact_manager.list(prefix="collections/dataset-gallery") print("Datasets in the gallery:", datasets) ``` + + +## HTTP API for Accessing Artifacts + +The `Artifact Manager` provides an HTTP endpoint for retrieving artifact manifests and data. This is useful for public-facing web applications that need to access datasets, models, or applications. + +### Endpoint: `/{workspace}/artifact/{path:path}` + +- **Workspace**: The workspace in which the artifact is stored. +- **Path**: The relative path to the artifact. + - For public artifacts, the path must begin with `public/`. + - For private artifacts, the path does not include the `public/` prefix and requires proper authentication. + +### Request Format: + +- **Method**: `GET` +- **Parameters**: + - `workspace`: The workspace in which the artifact is stored. + - `path`: The path to the artifact (e.g., `public/collections/dataset-gallery/example-dataset`). + - `stage` (optional): A boolean flag to indicate whether to fetch the staged version of the manifest (`_manifest.yaml`). Default is `False`. + +### Response: + +- **For public artifacts**: Returns the artifact manifest if it exists under the `public/` prefix. +- **For private artifacts**: Returns the artifact manifest if the user has the necessary permissions. + +### Example: + +#### Fetching a public artifact: + +```python +import requests + +SERVER_URL = "https://hypha.aicell.io" +workspace = "my-workspace" +response = requests.get(f"{SERVER_URL}/{workspace}/artifact/public/collections/dataset-gallery/example-dataset") +if response.ok: + artifact = response.json() + print(artifact["name"]) # Output: Example Dataset +else: + print(f"Error: {response.status_code}") +``` + +#### Fetching a private artifact: + +```python +response = requests.get(f"{SERVER_URL}/{workspace}/artifact/collections/private-dataset-gallery/private-example-dataset") +if response.ok: + artifact = response.json() + print(artifact["name"]) # Output: Private Example Dataset +else: + print(f"Error: {response.status_code}") +``` diff --git a/helm-charts/hypha-server/templates/deployment.yaml b/helm-charts/hypha-server/templates/deployment.yaml index 014bbb18..3b3fc001 100644 --- a/helm-charts/hypha-server/templates/deployment.yaml +++ b/helm-charts/hypha-server/templates/deployment.yaml @@ -44,12 +44,26 @@ spec: args: {{- toYaml .Values.startupCommand.args | nindent 12 }} env: {{- toYaml .Values.env | nindent 12 }} + volumeMounts: + - name: {{ .Values.persistence.volumeName }} + mountPath: {{ .Values.persistence.mountPath }} livenessProbe: {{- toYaml .Values.livenessProbe | nindent 12 }} readinessProbe: {{- toYaml .Values.readinessProbe | nindent 12 }} resources: {{- toYaml .Values.resources | nindent 12 }} + volumes: + - name: {{ .Values.persistence.volumeName }} + persistentVolumeClaim: + claimName: {{ .Values.persistence.existingClaim | default (include "hypha-server.fullname" .) }} + {{- if not .Values.persistence.existingClaim }} + accessModes: + {{- toYaml .Values.persistence.accessModes | nindent 14 }} + resources: + requests: + storage: {{ .Values.persistence.size }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/helm-charts/hypha-server/values.yaml b/helm-charts/hypha-server/values.yaml index 8bd42ed1..00707e36 100644 --- a/helm-charts/hypha-server/values.yaml +++ b/helm-charts/hypha-server/values.yaml @@ -102,12 +102,6 @@ env: key: JWT_SECRET - name: PUBLIC_BASE_URL value: "https://hypha.amun.ai" - # Use the pod's UID as the server ID - # This is important to ensure Hypha Server can handle multiple replicas - - name: HYPHA_SERVER_ID - valueFrom: - fieldRef: - fieldPath: metadata.uid # Define command-line arguments here startupCommand: @@ -117,3 +111,14 @@ startupCommand: - "--port=9520" - "--public-base-url=$(PUBLIC_BASE_URL)" # - "--redis-uri=redis://redis.hypha.svc.cluster.local:6379/0" + - "--database-uri=sqlite+aiosqlite:///app/data/artifacts.db" + +# Persistence Configuration +persistence: + volumeName: hypha-app-storage + mountPath: /app/data + storageClass: "" + accessModes: + - ReadWriteOnce + size: 5Gi + existingClaim: "" # If you have an existing claim, specify it here. Otherwise, a new PVC will be created. diff --git a/hypha/VERSION b/hypha/VERSION index 6e281531..26d90df8 100644 --- a/hypha/VERSION +++ b/hypha/VERSION @@ -1,3 +1,3 @@ { - "version": "0.20.38" + "version": "0.20.37.post4" } diff --git a/hypha/apps.py b/hypha/apps.py index 5038140a..78aaa892 100644 --- a/hypha/apps.py +++ b/hypha/apps.py @@ -65,12 +65,12 @@ def close(_) -> None: self.event_bus.on_local("shutdown", close) - async def setup_workspace(self, overwrite=True, context=None): + async def setup_applications_collection(self, overwrite=True, context=None): """Set up the workspace.""" ws = context["ws"] # Create an collection in the workspace manifest = { - "id": "description", + "id": "applications", "type": "collection", "name": "Applications", "description": f"A collection of applications for workspace {ws}", @@ -205,7 +205,7 @@ async def install( try: await self.artifact_manager.read("applications", context=context) except KeyError: - await self.setup_workspace(overwrite=True, context=context) + await self.setup_applications_collection(overwrite=True, context=context) # Create artifact using the artifact controller prefix = f"applications/{app_id}" await self.artifact_manager.create( diff --git a/hypha/artifact.py b/hypha/artifact.py index c2efb0ed..1521bcdc 100644 --- a/hypha/artifact.py +++ b/hypha/artifact.py @@ -1,20 +1,34 @@ -"""Provide an s3 interface.""" import logging import sys -import yaml +from sqlalchemy import ( + event, + Column, + String, + Integer, + JSON, + UniqueConstraint, + select, + text, + and_, + or_, +) +from hypha.utils import remove_objects_async, list_objects_async, safe_join from botocore.exceptions import ClientError - +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.asyncio import ( + create_async_engine, + async_sessionmaker, + AsyncSession, +) +from sqlalchemy.orm.attributes import flag_modified +from fastapi import APIRouter, Depends, HTTPException from hypha.core import ( UserInfo, UserPermission, Artifact, CollectionArtifact, ApplicationArtifact, -) -from hypha.utils import ( - list_objects_async, - remove_objects_async, - safe_join, + WorkspaceInfo, ) from hypha_rpc.utils import ObjectProxy from jsonschema import validate @@ -23,549 +37,617 @@ logger = logging.getLogger("artifact") logger.setLevel(logging.INFO) -MANIFEST_FILENAME = "manifest.yaml" -EDIT_MANIFEST_FILENAME = "_manifest.yaml" +Base = declarative_base() -class ArtifactController: - """Represent an artifact controller.""" +# SQLAlchemy model for storing artifacts +class ArtifactModel(Base): + __tablename__ = "artifacts" - def __init__(self, store, s3_controller=None, workspace_bucket="hypha-workspaces"): - """Set up controller.""" + id = Column(Integer, primary_key=True, autoincrement=True) + type = Column(String, nullable=False) + workspace = Column(String, nullable=False, index=True) + prefix = Column(String, nullable=False) + manifest = Column(JSON, nullable=True) # Store committed manifest + stage_manifest = Column(JSON, nullable=True) # Store staged manifest + stage_files = Column(JSON, nullable=True) # Store staged files during staging + __table_args__ = ( + UniqueConstraint("workspace", "prefix", name="_workspace_prefix_uc"), + ) + + +class ArtifactController: + """Artifact Controller using SQLAlchemy for database backend and S3 for file storage.""" + + def __init__( + self, + store, + s3_controller, + workspace_bucket="hypha-workspaces", + database_uri=None, + ): + """Set up controller with SQLAlchemy database and S3 for file storage.""" + if database_uri is None: + # create an in-memory SQLite database for testing + database_uri = "sqlite+aiosqlite:///:memory:" + logger.warning( + "Using in-memory SQLite database for artifact manager, all data will be lost on restart!!!" + ) + self.engine = create_async_engine(database_uri, echo=False) + self.SessionLocal = async_sessionmaker( + self.engine, expire_on_commit=False, class_=AsyncSession + ) self.s3_controller = s3_controller self.workspace_bucket = workspace_bucket + store.register_public_service(self.get_artifact_service()) store.set_artifact_manager(self) + router = APIRouter() + + @router.get("/{workspace}/artifact/{path:path}") + async def get_artifact( + workspace: str, + path: str, + stage: bool = False, + user_info: store.login_optional = Depends(store.login_optional), + ): + """Get artifact from the database.""" + try: + if path.startswith("public/"): + return await self._read_manifest(workspace, path, stage=stage) + else: + if not user_info.check_permission(workspace, UserPermission.read): + raise PermissionError( + "User does not have read permission to the workspace." + ) + return await self._read_manifest(workspace, path, stage=stage) + except KeyError as e: + raise HTTPException(status_code=404, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=403, detail=str(e)) + + store.register_router(router) + + async def init_db(self): + """Initialize the database and create tables.""" + async with self.engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + logger.info("Database tables created successfully.") + + async def _get_session(self, read_only=False): + """Return an SQLAlchemy async session. If read_only=True, ensure no modifications are allowed.""" + session = self.SessionLocal() + + if read_only: + sync_session = session.sync_session # Access the synchronous session object + + @event.listens_for(sync_session, "before_flush") + def prevent_flush(session, flush_context, instances): + """Prevent any flush operations to keep the session read-only.""" + raise RuntimeError("This session is read-only.") + + @event.listens_for(sync_session, "after_transaction_end") + def end_transaction(session, transaction): + """Ensure rollback after a transaction in a read-only session.""" + if not transaction._parent: + session.rollback() + + return session + + async def _read_manifest(self, workspace, prefix, stage=False): + session = await self._get_session() + try: + async with session.begin(): + query = select(ArtifactModel).filter( + ArtifactModel.workspace == workspace, + ArtifactModel.prefix == prefix, + ) + result = await session.execute(query) + artifact = result.scalar_one_or_none() + + if not artifact: + raise KeyError(f"Artifact under prefix '{prefix}' does not exist.") + + manifest = artifact.stage_manifest if stage else artifact.manifest + if not manifest: + raise KeyError(f"No manifest found for artifact '{prefix}'.") + + # If the artifact is a collection, dynamically populate the 'collection' field + if manifest.get("type") == "collection": + sub_prefix = f"{prefix}/" + query = select(ArtifactModel).filter( + ArtifactModel.workspace == workspace, + ArtifactModel.prefix.like(f"{sub_prefix}%"), + ) + result = await session.execute(query) + sub_artifacts = result.scalars().all() + + # Populate the 'collection' field with summary_fields for each sub-artifact + summary_fields = manifest.get( + "summary_fields", ["id", "name", "description"] + ) + collection = [] + for artifact in sub_artifacts: + sub_manifest = artifact.manifest + summary = {"_prefix": artifact.prefix} + for field in summary_fields: + value = sub_manifest.get(field) + if value is not None: + summary[field] = value + collection.append(summary) + + manifest["collection"] = collection + + if stage: + manifest["stage_files"] = artifact.stage_files + return manifest + finally: + await session.close() + + async def _get_artifact(self, session, workspace, prefix): + query = select(ArtifactModel).filter( + ArtifactModel.workspace == workspace, + ArtifactModel.prefix == prefix, + ) + result = await session.execute(query) + return result.scalar_one_or_none() + async def create( - self, prefix, manifest: dict, overwrite=False, stage=False, context: dict = None + self, + prefix, + manifest: dict, + overwrite=False, + stage=False, + context: dict = None, ): - """Create a new artifact with a manifest file.""" + """Create a new artifact and store its manifest in the database.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read_write): raise PermissionError( "User does not have write permission to the workspace." ) + # Validate based on the type of manifest if manifest["type"] == "collection": - # Validate the collection manifest CollectionArtifact.model_validate(manifest) elif manifest["type"] == "application": ApplicationArtifact.model_validate(manifest) else: Artifact.model_validate(manifest) - manifest_key = f"{ws}/{prefix}/{MANIFEST_FILENAME}" - edit_manifest_key = f"{ws}/{prefix}/{EDIT_MANIFEST_FILENAME}" + # Convert ObjectProxy to dict if necessary + if isinstance(manifest, ObjectProxy): + manifest = ObjectProxy.toDict(manifest) - async with self.s3_controller.create_client_async() as s3_client: - # Check if manifest or _manifest.yaml already exists - try: - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - manifest_exists = True - except ClientError: - manifest_exists = False + session = await self._get_session() + try: + async with session.begin(): + existing_artifact = await self._get_artifact(session, ws, prefix) - try: - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=edit_manifest_key + if existing_artifact: + if not overwrite: + raise FileExistsError( + f"Artifact under prefix '{prefix}' already exists. Use overwrite=True to overwrite." + ) + else: + logger.info(f"Overwriting artifact under prefix: {prefix}") + await session.delete(existing_artifact) + await session.commit() + + async with session.begin(): + new_artifact = ArtifactModel( + workspace=ws, + prefix=prefix, + manifest=None if stage else manifest, + stage_manifest=manifest if stage else None, + stage_files=[] if stage else None, + type=manifest["type"], ) - edit_manifest_exists = True - except ClientError: - edit_manifest_exists = False - - if manifest_exists or edit_manifest_exists: - if not overwrite: - raise FileExistsError( - f"Artifact under prefix '{prefix}' already exists. Use overwrite=True to overwrite." - ) - else: - logger.info(f"Overwriting artifact under prefix: {prefix}") - await self.delete(prefix, context=context) + session.add(new_artifact) + await session.commit() + logger.info(f"Created artifact under prefix: {prefix}") - # Check if the artifact is a collection and initialize it - if manifest.get("type") == "collection": - collection = await self._build_collection_index( - manifest, ws, prefix, s3_client - ) - manifest["collection"] = collection + finally: + await session.close() - # If overwrite=True, remove any existing manifest or _manifest.yaml - if overwrite: - if manifest_exists: - await s3_client.delete_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - if edit_manifest_exists: - await s3_client.delete_object( - Bucket=self.workspace_bucket, Key=edit_manifest_key - ) + return manifest - if not stage and manifest["type"] == "collection": - manifest["collection"] = await self._build_collection_index( - manifest, ws, prefix, s3_client - ) + async def read(self, prefix, stage=False, context: dict = None): + """Read the artifact's manifest from the database and populate collections dynamically.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") + ws = context["ws"] - # Update the parent collection index if the artifact is part of a collection - if not stage and "/" in prefix: - parent_prefix = "/".join(prefix.split("/")[:-1]) - # check if the collection manifest file exits - try: - collection_key = f"{ws}/{parent_prefix}/{MANIFEST_FILENAME}" - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=collection_key - ) - parent_collection = True - except ClientError: - parent_collection = False - else: - parent_collection = False - # Upload the _manifest.yaml to indicate it's in edit mode - response = await s3_client.put_object( - Body=yaml.dump(ObjectProxy.toDict(manifest)), - Bucket=self.workspace_bucket, - Key=edit_manifest_key if stage else manifest_key, + user_info = UserInfo.model_validate(context["user"]) + if not user_info.check_permission(ws, UserPermission.read): + raise PermissionError( + "User does not have read permission to the workspace." ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 200 - ), f"Failed to create artifact under prefix: {prefix}" - if parent_collection: - await self.index(parent_prefix, context=context) + manifest = await self._read_manifest(ws, prefix, stage=stage) return manifest async def edit(self, prefix, manifest=None, context: dict = None): - """Edit the artifact's manifest.""" + """Edit the artifact's manifest and save it in the database.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read_write): raise PermissionError( "User does not have write permission to the workspace." ) - # copy the manifest to _manifest.yaml - manifest_key = f"{ws}/{prefix}/{MANIFEST_FILENAME}" - edit_manifest_key = f"{ws}/{prefix}/{EDIT_MANIFEST_FILENAME}" - - async with self.s3_controller.create_client_async() as s3_client: - if not manifest: - # Get the manifest - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - manifest_str = (await manifest_obj["Body"].read()).decode() - manifest = yaml.safe_load(manifest_str) - Artifact.model_validate(manifest) - # Upload the _manifest.yaml to indicate it's in edit mode - response = await s3_client.put_object( - Body=yaml.dump(manifest), - Bucket=self.workspace_bucket, - Key=edit_manifest_key, - ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 200 - ), f"Failed to edit artifact under prefix: {prefix}" - async def index(self, prefix, context: dict = None): - """Update the index of the current collection.""" + # Validate the manifest + if manifest["type"] == "collection": + CollectionArtifact.model_validate(manifest) + elif manifest["type"] == "application": + ApplicationArtifact.model_validate(manifest) + elif manifest["type"] == "workspace": + WorkspaceInfo.model_validate(manifest) + + # Convert ObjectProxy to dict if necessary + if isinstance(manifest, ObjectProxy): + manifest = ObjectProxy.toDict(manifest) + + session = await self._get_session() + try: + async with session.begin(): + artifact = await self._get_artifact(session, ws, prefix) + if not artifact: + raise KeyError(f"Artifact under prefix '{prefix}' does not exist.") + + artifact.stage_manifest = manifest + flag_modified(artifact, "stage_manifest") # Mark JSON field as modified + session.add(artifact) + await session.commit() + logger.info(f"Edited artifact under prefix: {prefix}") + finally: + await session.close() + + async def commit(self, prefix, context: dict): + """Commit the artifact by finalizing the staged manifest and files.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read_write): raise PermissionError( "User does not have write permission to the workspace." ) - collection_key = f"{ws}/{prefix}/{MANIFEST_FILENAME}" - async with self.s3_controller.create_client_async() as s3_client: - # check if the collection manifest file exits - try: - # Check if this is a valid collection - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=collection_key - ) - except ClientError: - raise KeyError( - f"Collection manifest file does not exist for prefix '{prefix}'" - ) - manifest = yaml.safe_load((await manifest_obj["Body"].read()).decode()) - - collection = await self._build_collection_index( - manifest, ws, prefix, s3_client - ) - # Update the collection field - manifest["collection"] = collection - - # Save the updated manifest - response = await s3_client.put_object( - Body=yaml.dump(manifest), - Bucket=self.workspace_bucket, - Key=collection_key, - ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 200 - ), f"Failed to update collection index for '{prefix}'" - - async def _build_collection_index(self, manifest, ws, prefix, s3_client): - if manifest.get("type") != "collection": - raise ValueError( - f"The prefix '{prefix}' does not point to a valid collection." - ) - - # List all sub-artifacts under the collection prefix - sub_prefix = f"{ws}/{prefix}/" - sub_artifacts = await list_objects_async( - s3_client, self.workspace_bucket, prefix=sub_prefix - ) - collection = [] - - # Extract summary fields or default to 'name' and 'description' - summary_fields = manifest.get("summary_fields", ["name", "description"]) - - for obj in sub_artifacts: - if obj["type"] == "directory": - name = obj["name"] - try: - sub_manifest_key = f"{sub_prefix}{name}/{MANIFEST_FILENAME}" - sub_manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=sub_manifest_key + session = await self._get_session() + try: + async with session.begin(): + artifact = await self._get_artifact(session, ws, prefix) + if not artifact or not artifact.stage_manifest: + raise KeyError( + f"No staged manifest to commit for artifact '{prefix}'." ) - manifest_str = (await sub_manifest_obj["Body"].read()).decode() - sub_manifest = yaml.safe_load(manifest_str) - - # Extract summary information - summary = {} - for field in summary_fields: - keys = field.split(".") - value = sub_manifest - for key in keys: - if key in value: - value = value[key] - else: - value = None - break - if value is not None: - summary[field] = value - summary["_id"] = name - collection.append(summary) - except ClientError: - pass - except Exception as e: - logger.error(f"Error while building collection index: {e}") - raise e - - return collection - async def read(self, prefix, stage=False, context: dict = None): - """Read the artifact's manifest. Fallback to _manifest.yaml if manifest.yaml doesn't exist.""" + manifest = artifact.stage_manifest + + # Validate files exist in S3 if the staged files list is present + if artifact.stage_files: + async with self.s3_controller.create_client_async() as s3_client: + for file_info in artifact.stage_files: + file_key = safe_join(ws, f"{prefix}/{file_info['path']}") + try: + await s3_client.head_object( + Bucket=self.workspace_bucket, Key=file_key + ) + except ClientError: + raise FileNotFoundError( + f"File '{file_info['path']}' does not exist in the artifact." + ) + + # Validate the schema if the artifact belongs to a collection + parent_prefix = "/".join(prefix.split("/")[:-1]) + if parent_prefix: + parent_artifact = await self._get_artifact( + session, ws, parent_prefix + ) + if ( + parent_artifact + and "collection_schema" in parent_artifact.manifest + ): + collection_schema = parent_artifact.manifest[ + "collection_schema" + ] + try: + validate(instance=manifest, schema=collection_schema) + except Exception as e: + raise ValueError(f"ValidationError: {str(e)}") + + # Finalize the manifest + artifact.manifest = manifest + artifact.stage_manifest = None + artifact.stage_files = None + flag_modified(artifact, "manifest") + session.add(artifact) + await session.commit() + logger.info(f"Committed artifact under prefix: {prefix}") + finally: + await session.close() + + async def delete(self, prefix, context: dict): + """Delete an artifact from the database and S3.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) - if not user_info.check_permission(ws, UserPermission.read): + if not user_info.check_permission(ws, UserPermission.read_write): raise PermissionError( - "User does not have read permission to the workspace." + "User does not have write permission to the workspace." ) - manifest_key = f"{ws}/{prefix}/{MANIFEST_FILENAME}" - edit_manifest_key = f"{ws}/{prefix}/{EDIT_MANIFEST_FILENAME}" - + session = await self._get_session() + try: + async with session.begin(): + artifact = await self._get_artifact(session, ws, prefix) + if artifact: + await session.delete(artifact) + await session.commit() + logger.info(f"Deleted artifact under prefix: {prefix}") + finally: + await session.close() + + # Remove files from S3 + await self._delete_s3_files(ws, prefix) + + async def _delete_s3_files(self, ws, prefix): + """Helper method to delete files associated with an artifact in S3.""" + artifact_path = safe_join(ws, f"{prefix}") + "/" async with self.s3_controller.create_client_async() as s3_client: - try: - # Read the manifest depending on the stage - if stage: - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=edit_manifest_key - ) - else: - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - except ClientError: - # If manifest.yaml does not exist and it's not in edit mode, raise an error - raise KeyError(f"Artifact under prefix '{prefix}' does not exist.") - - manifest = yaml.safe_load((await manifest_obj["Body"].read()).decode()) - return manifest + await remove_objects_async(s3_client, self.workspace_bucket, artifact_path) - async def commit(self, prefix, context: dict = None): - """Commit the artifact, ensure all files are uploaded, and rename _manifest.yaml to manifest.yaml.""" + async def list_files(self, prefix, max_length=1000, context: dict = None): + """List files in the specified S3 prefix.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") ws = context["ws"] user_info = UserInfo.model_validate(context["user"]) - if not user_info.check_permission(ws, UserPermission.read_write): + if not user_info.check_permission(ws, UserPermission.read): raise PermissionError( - "User does not have write permission to the workspace." + "User does not have read permission to the workspace." ) - manifest_key = f"{ws}/{prefix}/{EDIT_MANIFEST_FILENAME}" - final_manifest_key = f"{ws}/{prefix}/{MANIFEST_FILENAME}" - - # Get the _manifest.yaml async with self.s3_controller.create_client_async() as s3_client: - try: - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=final_manifest_key - ) - except ClientError: - pass - - if "/" in prefix: - # If this artifact is part of a collection, update the collection index - parent_prefix = "/".join(prefix.split("/")[:-1]) - # check if the collection manifest file exits - try: - collection_key = f"{ws}/{parent_prefix}/{MANIFEST_FILENAME}" - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=collection_key - ) - parent_collection = True - except ClientError: - parent_collection = False - else: - parent_collection = False - - try: - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - except ClientError: - raise FileNotFoundError( - f"Artifact under prefix '{prefix}' does not exist." - ) - manifest = yaml.safe_load((await manifest_obj["Body"].read()).decode()) - - # Validate if all files in the manifest exist in S3 - if "files" in manifest: - for file_info in manifest["files"]: - file_key = f"{ws}/{prefix}/{file_info['path']}" - try: - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=file_key - ) - except ClientError: - raise FileNotFoundError( - f"File '{file_info['path']}' does not exist in the artifact." - ) - # if parent_collection is True, check if collection_schema exists in the parent schema - if parent_collection: - parent_manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=collection_key - ) - parent_manifest = yaml.safe_load( - (await parent_manifest_obj["Body"].read()).decode() - ) - if parent_manifest.get("collection_schema"): - collection_schema = parent_manifest.get("collection_schema") - if collection_schema: - validate(instance=manifest, schema=collection_schema) - # Rename _manifest.yaml to manifest.yaml - response = await s3_client.copy_object( - Bucket=self.workspace_bucket, - CopySource={"Bucket": self.workspace_bucket, "Key": manifest_key}, - Key=final_manifest_key, - ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 200 - ), f"Failed to commit manifest for artifact under prefix: {prefix}" - - # Delete the _manifest.yaml file - await s3_client.delete_object( - Bucket=self.workspace_bucket, Key=manifest_key + full_path = safe_join(ws, prefix) + "/" + items = await list_objects_async( + s3_client, self.workspace_bucket, full_path, max_length=max_length ) - - if parent_collection: - await self.index(parent_prefix, context=context) + return items async def list_artifacts(self, prefix="", stage=False, context: dict = None): """List all artifacts under a certain prefix.""" + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read): raise PermissionError( "User does not have read permission to the workspace." ) - prefix_path = safe_join(ws, prefix) + "/" - async with self.s3_controller.create_client_async() as s3_client: - artifacts = await list_objects_async( - s3_client, self.workspace_bucket, prefix=prefix_path + session = await self._get_session() + try: + async with session.begin(): + query = select(ArtifactModel).filter( + ArtifactModel.workspace == ws, + ArtifactModel.prefix.like(f"{prefix}/%"), + ) + result = await session.execute(query) + sub_artifacts = result.scalars().all() + + collection = [] + for artifact in sub_artifacts: + if artifact.prefix == prefix: + continue + sub_manifest = artifact.manifest + name = artifact.prefix[len(prefix) + 1 :] + name = name.split("/")[0] + collection.append(name) + return collection + finally: + await session.close() + + async def search( + self, prefix, keywords=None, filters=None, mode="AND", context: dict = None + ): + """ + Search artifacts within a collection under a specific prefix. + Supports: + - `keywords`: list of fuzzy search terms across all manifest fields. + - `filters`: dictionary of exact or fuzzy match for specific fields. + - `mode`: either 'AND' or 'OR' to combine conditions. + """ + if context is None or "ws" not in context: + raise ValueError("Context must include 'ws' (workspace).") + ws = context["ws"] + + user_info = UserInfo.model_validate(context["user"]) + if not user_info.check_permission(ws, UserPermission.read): + raise PermissionError( + "User does not have read permission to the workspace." ) - artifact_list = [] - try: - # Check if the prefix is a collection - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, - Key=f"{prefix_path}{MANIFEST_FILENAME}" - if not stage - else f"{prefix_path}{EDIT_MANIFEST_FILENAME}", + session = await self._get_session(read_only=True) + try: + async with session.begin(): + # Get the database backend from the SQLAlchemy engine + backend = ( + self.engine.dialect.name + ) # This will return 'postgresql', 'sqlite', etc. + + base_query = select(ArtifactModel).filter( + ArtifactModel.workspace == ws, + ArtifactModel.prefix.like(f"{prefix}/%"), ) - manifest = yaml.safe_load((await manifest_obj["Body"].read()).decode()) - if manifest.get("type") == "collection": - return [item["_id"] for item in manifest.get("collection", [])] - except ClientError: - pass - - # Otherwise, return a dynamic list of artifacts - for obj in artifacts: - if obj["type"] == "directory": - name = obj["name"] - try: - # head the manifest file to check if it's finalized - if stage: - await s3_client.head_object( - Bucket=self.workspace_bucket, - Key=f"{prefix_path}{name}/{EDIT_MANIFEST_FILENAME}", - ) - artifact_list.append(name) + + # Handle keyword-based search (fuzzy search across all manifest fields) + conditions = [] + if keywords: + for keyword in keywords: + if backend == "postgresql": + condition = text(f"manifest::text ILIKE '%{keyword}%'") else: - await s3_client.head_object( - Bucket=self.workspace_bucket, - Key=f"{prefix_path}{name}/{MANIFEST_FILENAME}", + condition = text( + f"json_extract(manifest, '$') LIKE '%{keyword}%'" ) - artifact_list.append(name) - except ClientError: - pass - return artifact_list + conditions.append(condition) + + # Handle filter-based search (specific key-value matching) + if filters: + for key, value in filters.items(): + if "*" in value: # Fuzzy matching + if backend == "postgresql": + condition = text( + f"manifest->>'{key}' ILIKE '{value.replace('*', '%')}'" + ) + else: + condition = text( + f"json_extract(manifest, '$.{key}') LIKE '{value.replace('*', '%')}'" + ) + else: # Exact matching + if backend == "postgresql": + condition = text(f"manifest->>'{key}' = '{value}'") + else: + condition = text( + f"json_extract(manifest, '$.{key}') = '{value}'" + ) + conditions.append(condition) + + # Combine conditions using AND/OR mode + if conditions: + if mode == "OR": + query = base_query.where(or_(*conditions)) + else: # Default is AND + query = base_query.where(and_(*conditions)) + else: + query = base_query + + # Execute the query + result = await session.execute(query) + artifacts = result.scalars().all() + + # Generate the results with summary_fields + summary_fields = [] + for artifact in artifacts: + sub_manifest = artifact.manifest + summary_fields.append({"_prefix": artifact.prefix, **sub_manifest}) + + return summary_fields + + except Exception as e: + raise ValueError( + f"An error occurred while executing the search query: {str(e)}" + ) + finally: + await session.close() async def put_file(self, prefix, file_path, context: dict = None): - """Put a file to an artifact and return the pre-signed URL.""" + """Generate a pre-signed URL to upload a file to an artifact in S3 and update the manifest.""" ws = context["ws"] user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read_write): raise PermissionError( "User does not have write permission to the workspace." ) - manifest_key = f"{ws}/{prefix}/{EDIT_MANIFEST_FILENAME}" - # Get the _manifest.yaml async with self.s3_controller.create_client_async() as s3_client: - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - manifest = yaml.safe_load((await manifest_obj["Body"].read()).decode()) - - # Generate a pre-signed URL for the file - file_key = f"{ws}/{prefix}/{file_path}" + file_key = safe_join(ws, f"{prefix}/{file_path}") presigned_url = await s3_client.generate_presigned_url( "put_object", Params={"Bucket": self.workspace_bucket, "Key": file_key}, ExpiresIn=3600, ) - # Add the file to the manifest - file_info = {"path": file_path} - manifest["files"] = manifest.get("files", []) - manifest["files"].append(file_info) + session = await self._get_session() + try: + async with session.begin(): + artifact = await self._get_artifact(session, ws, prefix) + if not artifact or not artifact.stage_manifest: + raise KeyError(f"No staged manifest found for artifact '{prefix}'.") - # Update the _manifest.yaml in S3 - response = await s3_client.put_object( - Body=yaml.dump(manifest), - Bucket=self.workspace_bucket, - Key=manifest_key, - ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 200 - ), f"Failed to update manifest for artifact under prefix: {prefix}" + artifact.stage_files = artifact.stage_files or [] - return presigned_url + if not any(f["path"] == file_path for f in artifact.stage_files): + artifact.stage_files.append({"path": file_path}) + flag_modified(artifact, "stage_files") - async def remove_file(self, prefix, file_path, context: dict = None): - """Remove a file from the artifact and update the _manifest.yaml.""" - ws = context["ws"] - user_info = UserInfo.model_validate(context["user"]) - if not user_info.check_permission(ws, UserPermission.read_write): - raise PermissionError( - "User does not have write permission to the workspace." - ) - manifest_key = f"{ws}/{prefix}/{EDIT_MANIFEST_FILENAME}" + session.add(artifact) + await session.commit() + logger.info(f"Generated pre-signed URL for file upload: {file_path}") + finally: + await session.close() - # Get the _manifest.yaml - async with self.s3_controller.create_client_async() as s3_client: - manifest_obj = await s3_client.get_object( - Bucket=self.workspace_bucket, Key=manifest_key - ) - manifest = yaml.safe_load((await manifest_obj["Body"].read()).decode()) - manifest["files"] = manifest.get("files", []) - # Remove the file from the manifest - manifest["files"] = [f for f in manifest["files"] if f["path"] != file_path] - - # Remove the file from S3 - file_key = f"{ws}/{prefix}/{file_path}" - response = await s3_client.delete_object( - Bucket=self.workspace_bucket, Key=file_key - ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 204 - ), f"Failed to delete file: {file_path}" - - # Update the _manifest.yaml in S3 - response = await s3_client.put_object( - Body=yaml.dump(manifest), - Bucket=self.workspace_bucket, - Key=manifest_key, - ) - assert ( - "ResponseMetadata" in response - and response["ResponseMetadata"]["HTTPStatusCode"] == 200 - ), f"Failed to update manifest after removing file: {file_path}" + return presigned_url - async def get_file(self, prefix, path, context: dict = None): - """Return a pre-signed URL for a file.""" + async def get_file(self, prefix, path, context: dict): + """Generate a pre-signed URL to download a file from an artifact in S3.""" ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read): raise PermissionError( "User does not have read permission to the workspace." ) - file_key = f"{ws}/{prefix}/{path}" - # Perform a head_object check before generating the URL async with self.s3_controller.create_client_async() as s3_client: - await s3_client.head_object(Bucket=self.workspace_bucket, Key=file_key) - - # Generate a pre-signed URL for the file + file_key = safe_join(ws, f"{prefix}/{path}") presigned_url = await s3_client.generate_presigned_url( "get_object", Params={"Bucket": self.workspace_bucket, "Key": file_key}, ExpiresIn=3600, ) - + logger.info(f"Generated pre-signed URL for file download: {path}") return presigned_url - async def delete(self, prefix, context: dict = None): - """Delete an entire artifact including all its files and the manifest.""" + async def remove_file(self, prefix, file_path, context: dict): + """Remove a file from the artifact and update the staged manifest.""" ws = context["ws"] + user_info = UserInfo.model_validate(context["user"]) if not user_info.check_permission(ws, UserPermission.read_write): raise PermissionError( "User does not have write permission to the workspace." ) - artifact_path = f"{ws}/{prefix}/" + + session = await self._get_session() + try: + async with session.begin(): + artifact = await self._get_artifact(session, ws, prefix) + if not artifact or not artifact.stage_manifest: + raise KeyError( + f"Artifact under prefix '{prefix}' is not in staging mode." + ) + # remove the file from the staged files list + artifact.stage_files = [ + f for f in artifact.stage_files if f["path"] != file_path + ] + flag_modified(artifact, "stage_files") + session.add(artifact) + await session.commit() + finally: + await session.close() async with self.s3_controller.create_client_async() as s3_client: - # Remove all objects under the artifact's path - await remove_objects_async(s3_client, self.workspace_bucket, artifact_path) + file_key = safe_join(ws, f"{prefix}/{file_path}") + await s3_client.delete_object(Bucket=self.workspace_bucket, Key=file_key) - if "/" in prefix: - # If this artifact is part of a collection, re-index the parent collection - parent_prefix = "/".join(prefix.split("/")[:-1]) - collection_key = f"{ws}/{parent_prefix}/{MANIFEST_FILENAME}" - # check if the collection manifest file exits - try: - await s3_client.head_object( - Bucket=self.workspace_bucket, Key=collection_key - ) - await self.index(parent_prefix, context=context) - except ClientError: - pass + logger.info(f"Removed file from artifact: {file_key}") def get_artifact_service(self): - """Get artifact controller.""" + """Return the artifact service definition.""" return { "id": "artifact-manager", "config": {"visibility": "public", "require_context": True}, @@ -580,5 +662,6 @@ def get_artifact_service(self): "remove_file": self.remove_file, "get_file": self.get_file, "list": self.list_artifacts, - "index": self.index, # New index function + "search": self.search, + "list_files": self.list_files, } diff --git a/hypha/core/__init__.py b/hypha/core/__init__.py index 83c3e2b7..3aa89f18 100644 --- a/hypha/core/__init__.py +++ b/hypha/core/__init__.py @@ -226,8 +226,10 @@ class Artifact(BaseModel): model_config = ConfigDict(extra="allow") - name: str - id: str + type: str + format_version: str = "0.2.1" + name: Optional[str] = None + id: Optional[str] = None description: Optional[str] = None tags: Optional[List[str]] = None documentation: Optional[str] = None @@ -237,9 +239,7 @@ class Artifact(BaseModel): attachments: Optional[Dict[str, List[Any]]] = None files: Optional[List[Dict[str, Any]]] = None config: Optional[Dict[str, Any]] = None - type: str - format_version: str = "0.2.1" - version: str = "0.1.0" + version: Optional[str] = "0.1.0" links: Optional[List[str]] = None maintainers: Optional[List[Dict[str, str]]] = None license: Optional[str] = None @@ -261,7 +261,7 @@ class CollectionArtifact(Artifact): """Represent collection artifact.""" type: str = "collection" - collection: List[str] = [] + collection: Optional[List[str]] = [] summary_fields: Optional[List[str]] = None collection_schema: Optional[Dict[str, Any]] = None @@ -301,6 +301,7 @@ class ServiceTypeInfo(BaseModel): class WorkspaceInfo(BaseModel): """Represent a workspace.""" + type: str = "workspace" id: Optional[str] = None # we will use name as id if not provided name: str description: Optional[str] = None @@ -310,7 +311,6 @@ class WorkspaceInfo(BaseModel): icon: Optional[str] = None covers: Optional[List[str]] = None docs: Optional[str] = None - applications: Optional[Dict[str, Any]] = {} service_types: Optional[Dict[str, ServiceTypeInfo]] = {} config: Optional[Dict[str, Any]] = {} @@ -322,10 +322,6 @@ def __init__(self, **data): @classmethod def model_validate(cls, data): data = data.copy() - if "applications" in data and data["applications"] is not None: - data["applications"] = { - k: Artifact.model_validate(v) for k, v in data["applications"].items() - } return super().model_validate(data) diff --git a/hypha/core/store.py b/hypha/core/store.py index 1ffcf950..b4d8ea43 100644 --- a/hypha/core/store.py +++ b/hypha/core/store.py @@ -232,8 +232,15 @@ async def upgrade(self): "change": "Start upgrade process", } ) - # Remove all the keys contains `{self._root_user.get_workspace()}/workspace-client-*` - # and `public/workspace-client-*` + # For <=0.20.38, upgrade workspaces so it contains `type` key but not containing `applications` key + workspaces = await self._redis.hgetall("workspaces") + for k, v in workspaces.items(): + workspace_info = json.loads(v.decode()) + workspace_info["type"] = "workspace" + if "applications" in workspace_info: + del workspace_info["applications"] + await self._redis.hset("workspaces", k, json.dumps(workspace_info)) + old_keys = await self._redis.keys( f"services:*|*:{self._root_user.get_workspace()}/workspace-client-*:*@*" ) @@ -366,6 +373,8 @@ async def init(self, reset_redis, startup_functions=None): logger.warning("RESETTING ALL REDIS DATA!!!") await self._redis.flushall() await self._event_bus.init() + if self._artifact_manager: + await self._artifact_manager.init_db() await self.setup_root_user() await self.check_and_cleanup_servers() self._workspace_manager = await self.register_workspace_manager() diff --git a/hypha/core/workspace.py b/hypha/core/workspace.py index 5846fba9..e5d15aeb 100644 --- a/hypha/core/workspace.py +++ b/hypha/core/workspace.py @@ -152,6 +152,7 @@ async def bookmark( user_info = UserInfo.model_validate(context["user"]) assert "bookmark_type" in item, "Bookmark type must be provided." user_workspace = await self.load_workspace_info(user_info.get_workspace()) + assert user_workspace, f"User workspace not found: {user_info.get_workspace()}" user_workspace.config = user_workspace.config or {} if "bookmarks" not in user_workspace.config: user_workspace.config["bookmarks"] = [] @@ -236,12 +237,12 @@ async def create_workspace( user_info = UserInfo.model_validate(context["user"]) if user_info.is_anonymous: raise Exception("Only registered user can create workspace.") - try: - if await self.load_workspace_info(ws): - if not overwrite: - raise KeyError(f"Workspace already exists: {ws}") - except KeyError: - pass + if not overwrite: + try: + if await self.load_workspace_info(config["id"]): + raise RuntimeError(f"Workspace already exists: {config['id']}") + except KeyError: + pass config["persistent"] = config.get("persistent") or False if user_info.is_anonymous and config["persistent"]: diff --git a/hypha/http.py b/hypha/http.py index db3ae21b..e264c930 100644 --- a/hypha/http.py +++ b/hypha/http.py @@ -399,7 +399,6 @@ def __init__( secret_access_key=None, region_name=None, workspace_bucket="hypha-workspaces", - workspace_etc_dir="etc", base_path="/", ) -> None: """Initialize the http proxy.""" @@ -411,7 +410,6 @@ def __init__( self.region_name = region_name self.s3_enabled = endpoint_url is not None self.workspace_bucket = workspace_bucket - self.workspace_etc_dir = workspace_etc_dir self.ws_apps_dir = Path(__file__).parent / "templates/ws" self.ws_app_files = os.listdir(self.ws_apps_dir) self.templates_dir = Path(__file__).parent / "templates" @@ -842,6 +840,11 @@ async def liveness(req: Request) -> JSONResponse: """ return JSONResponse({"status": "OK"}) + @app.get(norm_url("/login")) + async def login(request: Request): + """Redirect to the login page.""" + return RedirectResponse(norm_url("/public/apps/hypha-login/")) + @app.get(norm_url("/{page:path}")) async def get_pages( page: str, diff --git a/hypha/s3.py b/hypha/s3.py index b3322304..b45bceec 100644 --- a/hypha/s3.py +++ b/hypha/s3.py @@ -17,7 +17,7 @@ from aiobotocore.session import get_session from botocore.exceptions import ClientError from fastapi import APIRouter, Depends, Request, HTTPException -from fastapi.responses import FileResponse, Response, StreamingResponse +from fastapi.responses import FileResponse, Response, StreamingResponse, JSONResponse from starlette.datastructures import Headers from starlette.types import Receive, Scope, Send @@ -141,29 +141,6 @@ async def fetch_zip_tail(s3_client, workspace_bucket, s3_key, content_length): return zip_tail -class JSONResponse(Response): - """Represent a JSON response. - - This implementation is needed because some of the S3 response - contains datetime which is not json serializable. - It works by setting `default=str` which converts the datetime - into a string. - """ - - media_type = "application/json" - - def render(self, content: Any) -> bytes: - """Render the content.""" - return json.dumps( - content, - ensure_ascii=False, - allow_nan=False, - indent=None, - separators=(",", ":"), - default=str, # This will convert everything unknown to a string - ).encode("utf-8") - - DEFAULT_CORS_POLICY = { "CORSRules": [ { @@ -442,18 +419,27 @@ async def proxy_s3_request( try: # For methods like POST/PUT, pass the request body if method in ["POST", "PUT", "PATCH"]: + # Read and stream the request body in chunks + async def request_body_stream(): + async for chunk in request.stream(): + yield chunk + response = await client.request( method, s3_url, - content=request.stream(), # Stream the request body + content=request_body_stream(), # Stream the request body to S3 headers=essential_headers, # Forward essential headers + timeout=None, # Remove timeout for large file uploads ) else: response = await client.request( method, s3_url, headers=essential_headers, # Forward essential headers + timeout=None, ) + + # Return the response, stream data for GET requests if method == "GET": return StreamingResponse( response.iter_bytes(), # Async iterator of response body chunks @@ -463,7 +449,7 @@ async def proxy_s3_request( for k, v in response.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"] - }, # Forward all response headers except Content-Encoding and Transfer-Encoding + }, ) elif method in ["POST", "PATCH", "PUT", "DELETE"]: @@ -472,10 +458,11 @@ async def proxy_s3_request( status_code=response.status_code, headers=response.headers, # Pass raw headers from the response ) + elif method == "HEAD": return Response( status_code=response.status_code, - headers=response.headers, # No content for HEAD, but forward headers + headers=response.headers, # No content for HEAD, just forward headers ) else: @@ -744,15 +731,6 @@ def create_client_async(self, public=False): region_name=self.region_name, ) - async def list_users( - self, - ): - """List users.""" - path = self.workspace_etc_dir + "/" - async with self.create_client_async() as s3_client: - items = await list_objects_async(s3_client, self.workspace_bucket, path) - return items - async def cleanup_workspace(self, workspace: WorkspaceInfo, force=False): """Clean up workspace.""" assert isinstance(workspace, WorkspaceInfo) diff --git a/hypha/server.py b/hypha/server.py index 5e1ee8b4..2e018463 100644 --- a/hypha/server.py +++ b/hypha/server.py @@ -85,8 +85,8 @@ def start_builtin_services( if args.enable_s3: # pylint: disable=import-outside-toplevel - from hypha.artifact import ArtifactController from hypha.s3 import S3Controller + from hypha.artifact import ArtifactController s3_controller = S3Controller( store, @@ -100,9 +100,11 @@ def start_builtin_services( workspace_bucket=args.workspace_bucket, executable_path=args.executable_path, ) - artifact_manager = ArtifactController( - store, s3_controller=s3_controller, workspace_bucket=args.workspace_bucket + store, + s3_controller=s3_controller, + workspace_bucket=args.workspace_bucket, + database_uri=args.database_uri, ) if args.enable_server_apps: @@ -371,6 +373,12 @@ def get_argparser(add_help=True): default=None, help="set SecretAccessKey for S3", ) + parser.add_argument( + "--database-uri", + type=str, + default=None, + help="set database URI for the artifact manager", + ) parser.add_argument( "--workspace-bucket", type=str, diff --git a/requirements.txt b/requirements.txt index 4cbb60cf..f8acd16b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,5 @@ redis==4.6.0 friendlywords==1.1.3 aiocache==0.12.2 jsonschema==3.2.0 +sqlalchemy==2.0.35 +aiosqlite==0.20.0 diff --git a/setup.py b/setup.py index e0e48fb6..0a736131 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,8 @@ "friendlywords>=1.1.3", "aiocache>=0.12.2", "jsonschema>=3.2.0", + "sqlalchemy>=2.0.35", + "aiosqlite>=0.20.0", ] ROOT_DIR = Path(__file__).parent.resolve() diff --git a/tests/conftest.py b/tests/conftest.py index 6216cac2..2edf78e4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -153,6 +153,10 @@ def redis_server(): @pytest_asyncio.fixture(name="fastapi_server", scope="session") def fastapi_server_fixture(minio_server): """Start server as test fixture and tear down after test.""" + # create a temporary directory for the artifacts database + dirpath = tempfile.mkdtemp() + print(f"Artifacts database directory: {dirpath}") + db_path = f"sqlite+aiosqlite:///{dirpath}/artifacts.db" with subprocess.Popen( [ sys.executable, @@ -161,6 +165,7 @@ def fastapi_server_fixture(minio_server): f"--port={SIO_PORT}", "--enable-server-apps", "--enable-s3", + f"--database-uri={db_path}", "--reset-redis", f"--endpoint-url={MINIO_SERVER_URL}", f"--access-key-id={MINIO_ROOT_USER}", diff --git a/tests/test_artifact.py b/tests/test_artifact.py index ecf44706..db18bc08 100644 --- a/tests/test_artifact.py +++ b/tests/test_artifact.py @@ -30,6 +30,97 @@ pytestmark = pytest.mark.asyncio +async def test_serve_artifact_endpoint(minio_server, fastapi_server): + """Test the artifact serving endpoint.""" + api = await connect_to_server({"name": "test-client", "server_url": SERVER_URL}) + artifact_manager = await api.get_service("public/artifact-manager") + + # Create a public collection (prefix must start with "public/") + collection_manifest = { + "id": "dataset-gallery", + "name": "Public Dataset Gallery", + "description": "A public collection for organizing datasets", + "type": "collection", + "collection": [], + } + await artifact_manager.create( + prefix="public/collections/dataset-gallery", manifest=collection_manifest + ) + + # Create an artifact inside the public collection + dataset_manifest = { + "id": "public-example-dataset", + "name": "Public Example Dataset", + "description": "A public dataset with example data", + "type": "dataset", + } + await artifact_manager.create( + prefix="public/collections/dataset-gallery/public-example-dataset", + manifest=dataset_manifest, + stage=True, + ) + + # Commit the artifact + await artifact_manager.commit( + prefix="public/collections/dataset-gallery/public-example-dataset" + ) + + # Ensure the public artifact is available via HTTP + response = requests.get( + f"{SERVER_URL}/{api.config.workspace}/artifact/public/collections/dataset-gallery/public-example-dataset" + ) + assert response.status_code == 200 + assert "Public Example Dataset" in response.json()["name"] + + # Now create a non-public collection (prefix does not start with "public/") + private_collection_manifest = { + "id": "private-dataset-gallery", + "name": "Private Dataset Gallery", + "description": "A private collection for organizing datasets", + "type": "collection", + "collection": [], + } + await artifact_manager.create( + prefix="collections/private-dataset-gallery", + manifest=private_collection_manifest, + ) + + # Create an artifact inside the private collection + private_dataset_manifest = { + "id": "private-example-dataset", + "name": "Private Example Dataset", + "description": "A private dataset with example data", + "type": "dataset", + "files": [], + } + await artifact_manager.create( + prefix="collections/private-dataset-gallery/private-example-dataset", + manifest=private_dataset_manifest, + stage=True, + ) + + # Commit the private artifact + await artifact_manager.commit( + prefix="collections/private-dataset-gallery/private-example-dataset" + ) + + token = await api.generate_token() + # Ensure the private artifact is available via HTTP (requires authentication or special permissions) + response = requests.get( + f"{SERVER_URL}/{api.config.workspace}/artifact/collections/private-dataset-gallery/private-example-dataset", + headers={"Authorization": f"Bearer {token}"}, + ) + + assert response.status_code == 200 + assert "Private Example Dataset" in response.json()["name"] + + # If no authentication is provided, the server should return a 401 Unauthorized status code + response = requests.get( + f"{SERVER_URL}/{api.config.workspace}/artifact/collections/private-dataset-gallery/private-example-dataset" + ) + assert response.status_code == 403 + + async def test_edit_existing_artifact(minio_server, fastapi_server): """Test editing an existing artifact.""" api = await connect_to_server( @@ -57,7 +148,6 @@ async def test_edit_existing_artifact(minio_server, fastapi_server): "name": "edit-test-dataset", "description": "A test dataset to edit", "type": "dataset", - "files": [], } await artifact_manager.create( @@ -73,7 +163,11 @@ async def test_edit_existing_artifact(minio_server, fastapi_server): # Ensure that the dataset appears in the collection's index collection = await artifact_manager.read(prefix="collections/edit-test-collection") - assert find_item(collection["collection"], "_id", "edit-test-dataset") + assert find_item( + collection["collection"], + "_prefix", + "collections/edit-test-collection/edit-test-dataset", + ) # Edit the artifact's manifest edited_manifest = { @@ -204,7 +298,11 @@ async def test_artifact_schema_validation(minio_server, fastapi_server): collection = await artifact_manager.read( prefix="collections/schema-test-collection" ) - assert find_item(collection["collection"], "_id", "valid-dataset") + assert find_item( + collection["collection"], + "_prefix", + "collections/schema-test-collection/valid-dataset", + ) # Now, create an invalid dataset artifact that does not conform to the schema (missing required fields) invalid_dataset_manifest = { @@ -292,14 +390,20 @@ async def test_artifact_manager_with_collection(minio_server, fastapi_server): manifest_data = await artifact_manager.read( prefix="collections/test-collection/test-dataset", stage=True ) - assert find_item(manifest_data["files"], "path", "test.txt") + + files = await artifact_manager.list_files( + prefix="collections/test-collection/test-dataset" + ) + assert find_item(files, "name", "test.txt") # Commit the artifact (finalize it) await artifact_manager.commit(prefix="collections/test-collection/test-dataset") # Ensure that the dataset appears in the collection's index collection = await artifact_manager.read(prefix="collections/test-collection") - assert find_item(collection["collection"], "_id", "test-dataset") + assert find_item( + collection["collection"], "_prefix", "collections/test-collection/test-dataset" + ) # Ensure that the manifest.yaml is finalized and the artifact is validated artifacts = await artifact_manager.list(prefix="collections/test-collection") @@ -345,7 +449,11 @@ async def test_artifact_manager_with_collection(minio_server, fastapi_server): # Ensure the collection is updated after removing the dataset collection = await artifact_manager.read(prefix="collections/test-collection") - assert not find_item(collection["collection"], "_id", "test-dataset") + assert not find_item( + collection["collection"], + "collections/test-collection", + "collections/test-collection/test-dataset", + ) # Clean up by deleting the collection await artifact_manager.delete(prefix="collections/test-collection") @@ -378,7 +486,6 @@ async def test_artifact_edge_cases_with_collection(minio_server, fastapi_server) "name": "edge-case-dataset", "description": "Edge case test dataset", "type": "dataset", - "files": [], } # Create the artifact first @@ -416,7 +523,11 @@ async def test_artifact_edge_cases_with_collection(minio_server, fastapi_server) # Ensure that the collection index is updated collection = await artifact_manager.read(prefix="collections/edge-case-collection") - assert find_item(collection["collection"], "_id", "edge-case-dataset") + assert find_item( + collection["collection"], + "_prefix", + "collections/edge-case-collection/edge-case-dataset", + ) # Test validation without uploading a file incomplete_manifest = { @@ -424,7 +535,6 @@ async def test_artifact_edge_cases_with_collection(minio_server, fastapi_server) "name": "incomplete-dataset", "description": "This dataset is incomplete", "type": "dataset", - "files": [{"path": "missing.txt"}], } await artifact_manager.create( @@ -433,6 +543,11 @@ async def test_artifact_edge_cases_with_collection(minio_server, fastapi_server) stage=True, ) + await artifact_manager.put_file( + prefix="collections/edge-case-collection/incomplete-dataset", + file_path="missing_file.txt", + ) + # Commit should raise an error due to the missing file with pytest.raises( Exception, @@ -450,3 +565,163 @@ async def test_artifact_edge_cases_with_collection(minio_server, fastapi_server) prefix="collections/edge-case-collection/incomplete-dataset" ) await artifact_manager.delete(prefix="collections/edge-case-collection") + + +async def test_artifact_search_in_manifest(minio_server, fastapi_server): + """Test search functionality within the 'manifest' field of artifacts with multiple keywords and both AND and OR modes.""" + api = await connect_to_server({"name": "test-client", "server_url": SERVER_URL}) + artifact_manager = await api.get_service("public/artifact-manager") + + # Create a collection for testing search + collection_manifest = { + "id": "search-test-collection", + "name": "Search Test Collection", + "description": "A collection to test search functionality", + "type": "collection", + "collection": [], + } + await artifact_manager.create( + prefix="collections/search-test-collection", + manifest=collection_manifest, + stage=False, + ) + + # Create multiple artifacts inside the collection + for i in range(5): + dataset_manifest = { + "id": f"test-dataset-{i}", + "name": f"Test Dataset {i}", + "description": f"A test dataset {i}", + "type": "dataset", + } + await artifact_manager.create( + prefix=f"collections/search-test-collection/test-dataset-{i}", + manifest=dataset_manifest, + stage=True, + ) + await artifact_manager.commit( + prefix=f"collections/search-test-collection/test-dataset-{i}" + ) + + # Use the search function to find datasets with 'Dataset 3' in the 'name' field using AND mode + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", keywords=["Dataset 3"], mode="AND" + ) + + # Assert that the search results contain only the relevant dataset + assert len(search_results) == 1 + assert search_results[0]["name"] == "Test Dataset 3" + + # Test search for multiple results by 'description' field using OR mode + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", + keywords=["test", "dataset"], + mode="OR", + ) + + # Assert that all datasets are returned because both keywords appear in the description + assert len(search_results) == 5 + + # Test search with multiple keywords using AND mode (this should return fewer results) + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", + keywords=["Test Dataset", "3"], + mode="AND", + ) + + # Assert that only the dataset with 'Test Dataset 3' is returned + assert len(search_results) == 1 + assert search_results[0]["name"] == "Test Dataset 3" + + # Clean up by deleting the datasets and the collection + for i in range(5): + await artifact_manager.delete( + prefix=f"collections/search-test-collection/test-dataset-{i}" + ) + await artifact_manager.delete(prefix="collections/search-test-collection") + + +async def test_artifact_search_with_filters(minio_server, fastapi_server): + """Test search functionality with specific key-value filters in the manifest with multiple filters and AND/OR modes.""" + api = await connect_to_server({"name": "test-client", "server_url": SERVER_URL}) + artifact_manager = await api.get_service("public/artifact-manager") + + # Create a collection for testing search + collection_manifest = { + "id": "search-test-collection", + "name": "Search Test Collection", + "description": "A collection to test search functionality", + "type": "collection", + "collection": [], + } + await artifact_manager.create( + prefix="collections/search-test-collection", + manifest=collection_manifest, + stage=False, + ) + + # Create multiple artifacts inside the collection + for i in range(5): + dataset_manifest = { + "id": f"test-dataset-{i}", + "name": f"Test Dataset {i}", + "description": f"A test dataset {i}", + "type": "dataset", + } + await artifact_manager.create( + prefix=f"collections/search-test-collection/test-dataset-{i}", + manifest=dataset_manifest, + stage=True, + ) + await artifact_manager.commit( + prefix=f"collections/search-test-collection/test-dataset-{i}" + ) + + # Use the search function to find datasets with an exact name match using AND mode + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", + filters={"name": "Test Dataset 3"}, + mode="AND", + ) + + # Assert that the search results contain only the relevant dataset + assert len(search_results) == 1 + assert search_results[0]["name"] == "Test Dataset 3" + + # Test search with fuzzy match on name using OR mode + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", + filters={"name": "Test*"}, + mode="OR", + ) + + # Assert that all datasets are returned since the fuzzy match applies to all + assert len(search_results) == 5 + + # Test search with multiple filters in AND mode (exact match on name and description) + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", + filters={"name": "Test Dataset 3", "description": "A test dataset 3"}, + mode="AND", + ) + + # Assert that only one dataset is returned + assert len(search_results) == 1 + assert search_results[0]["name"] == "Test Dataset 3" + + # Test search with multiple filters in OR mode (match any of the fields) + search_results = await artifact_manager.search( + prefix="collections/search-test-collection", + filters={"name": "Test Dataset 3", "description": "A test dataset 1"}, + mode="OR", + ) + + # Assert that two datasets are returned (matching either name or description) + assert len(search_results) == 2 + + # Clean up by deleting the datasets and the collection + for i in range(5): + await artifact_manager.delete( + prefix=f"collections/search-test-collection/test-dataset-{i}" + ) + await artifact_manager.delete(prefix="collections/search-test-collection") diff --git a/tests/test_workspace_loader.py b/tests/test_workspace_loader.py index 40243bc8..7efca468 100644 --- a/tests/test_workspace_loader.py +++ b/tests/test_workspace_loader.py @@ -63,7 +63,7 @@ async def test_delete_workspace(fastapi_server, test_user_token): ) as api: ws = await api.create_workspace( dict( - name="test-2", + name="test-3", description="test workspace", owners=[], persistent=True,