Skip to content

Commit

Permalink
feat(dataset-repository): add Dataset model to dataset-repository
Browse files Browse the repository at this point in the history
Task: IL-238
  • Loading branch information
Valentina Galata authored and ValentinaGalataTNG committed Mar 13, 2024
1 parent 769cf50 commit 85eed4e
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 131 deletions.
7 changes: 3 additions & 4 deletions src/examples/human_evaluation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from typing import Iterable, cast\n",
"\n",
"from datasets import load_dataset\n",
Expand Down Expand Up @@ -77,7 +76,6 @@
" Runner,\n",
" SuccessfulExampleOutput,\n",
")\n",
"\n",
"from intelligence_layer.evaluation.argilla import ArgillaAggregator\n",
"\n",
"load_dotenv()\n",
Expand Down Expand Up @@ -163,8 +161,9 @@
" id=str(dataset[\"meta\"][i][\"id\"]),\n",
" )\n",
" for i in range(num_examples)\n",
" ]\n",
")"
" ],\n",
" dataset_name=\"human-evaluation-dataset\",\n",
").id"
]
},
{
Expand Down
8 changes: 5 additions & 3 deletions src/examples/quickstart_task.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,9 @@
"expected_output = KeywordExtractionExpectedOutput(keywords=[\"dolphins\", \"sharks\"])\n",
"\n",
"single_example_dataset = dataset_repository.create_dataset(\n",
" examples=[Example(input=model_input, expected_output=expected_output)]\n",
")\n",
" examples=[Example(input=model_input, expected_output=expected_output)],\n",
" dataset_name=\"quickstart-task-single-example-dataset\",\n",
").id\n",
"\n",
"run_overview = runner.run_dataset(single_example_dataset, NoOpTracer())"
]
Expand Down Expand Up @@ -483,7 +484,8 @@
" ),\n",
" ),\n",
" ],\n",
")\n",
" dataset_name=\"human-evaluation-multiple-examples-dataset\",\n",
").id\n",
"\n",
"run = runner.run_dataset(dataset_id)\n",
"evaluation_overview = evaluator.evaluate_runs(run.id)\n",
Expand Down
191 changes: 136 additions & 55 deletions src/intelligence_layer/evaluation/data_storage/dataset_repository.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Iterable, Optional, Sequence, cast
from uuid import uuid4
from typing import Iterable, Optional, Sequence, Tuple, cast

from fsspec import AbstractFileSystem # type: ignore
from fsspec.implementations.local import LocalFileSystem # type: ignore

from intelligence_layer.core import Input, JsonSerializer, PydanticSerializable
from intelligence_layer.evaluation.domain import Example, ExpectedOutput
from intelligence_layer.evaluation.domain import Dataset, Example, ExpectedOutput


class DatasetRepository(ABC):
Expand All @@ -18,16 +17,16 @@ class DatasetRepository(ABC):

@abstractmethod
def create_dataset(
self,
examples: Iterable[Example[Input, ExpectedOutput]],
) -> str:
self, examples: Iterable[Example[Input, ExpectedOutput]], dataset_name: str
) -> Dataset:
"""Creates a dataset from given :class:`Example`s and returns the ID of that dataset.
Args:
examples: An :class:`Iterable` of :class:`Example`s to be saved in the same dataset.
dataset_name: A name for the dataset.
Returns:
The ID of the created dataset.
The created :class:`Dataset`.
"""
pass

Expand All @@ -40,6 +39,29 @@ def delete_dataset(self, dataset_id: str) -> None:
"""
pass

@abstractmethod
def dataset(self, dataset_id: str) -> Optional[Dataset]:
"""Returns a dataset identified by the given dataset ID.
Args:
dataset_id: Dataset ID of the dataset to delete.
Returns:
:class:`Dataset` if it was not, `None` otherwise.
"""
pass

def datasets(self) -> Iterable[Dataset]:
"""Returns all :class:`Dataset`s sorted by their ID.
Returns:
:class:`Sequence` of :class:`Dataset`s.
"""
for dataset_id in self.dataset_ids():
dataset = self.dataset(dataset_id)
if dataset is not None:
yield dataset

@abstractmethod
def dataset_ids(self) -> Iterable[str]:
"""Returns all sorted dataset IDs.
Expand Down Expand Up @@ -93,40 +115,79 @@ def examples(
class FileSystemDatasetRepository(DatasetRepository):
_REPO_TYPE = "dataset"

def __init__(self, fs: AbstractFileSystem, root_directory: str) -> None:
def __init__(self, filesystem: AbstractFileSystem, root_directory: Path) -> None:
super().__init__()
assert root_directory[-1] != "/"
self._fs = fs

assert str(root_directory)[-1] != "/"
root_directory.mkdir(parents=True, exist_ok=True)

self._file_system = filesystem
self._root_directory = root_directory

def create_dataset(self, examples: Iterable[Example[Input, ExpectedOutput]]) -> str:
dataset_id = str(uuid4())
dataset_path = self._dataset_path(dataset_id)
if self._fs.exists(dataset_path):
raise ValueError(f"Dataset name {dataset_id} already taken")
def create_dataset(
self, examples: Iterable[Example[Input, ExpectedOutput]], dataset_name: str
) -> Dataset:
dataset = Dataset(name=dataset_name)
try:
self._dataset_directory(dataset.id).mkdir(exist_ok=False)
except OSError:
raise ValueError(
f"Created random dataset ID already exists for dataset {dataset}. This should not happen."
)

dataset_path = self._dataset_path(dataset.id)
examples_path = self._dataset_examples_path(dataset.id)
if self._file_system.exists(dataset_path) or self._file_system.exists(
examples_path
):
raise ValueError(
f"One of the dataset files already exist for dataset {dataset}. This should not happen. Files: {dataset_path}, {examples_path}."
)

with self._fs.open(dataset_path, "w", encoding="utf-8") as examples_file:
with self._file_system.open(
str(dataset_path), "w", encoding="utf-8"
) as dataset_file:
dataset_file.write(JsonSerializer(root=dataset).model_dump_json() + "\n")

with self._file_system.open(
str(examples_path), "w", encoding="utf-8"
) as examples_file:
for example in examples:
serialized_result = JsonSerializer(root=example)
text = serialized_result.model_dump_json() + "\n"
examples_file.write(text)
return dataset_id

return dataset

def delete_dataset(self, dataset_id: str) -> None:
dataset_path = self._dataset_path(dataset_id)
try:
self._fs.rm(dataset_path, recursive=True)
self._file_system.rm(
str(self._dataset_directory(dataset_id)), recursive=True
)
except FileNotFoundError:
pass

def dataset(self, dataset_id: str) -> Optional[Dataset]:
file_path = self._dataset_path(dataset_id)
if not file_path.exists():
return None

with self._file_system.open(
str(file_path), "r", encoding="utf-8"
) as file_content:
# we save only one dataset per file
return [
Dataset.model_validate_json(dataset_string)
for dataset_string in file_content
][0]

def dataset_ids(self) -> Iterable[str]:
return sorted(
[
Path(f["name"]).stem
for f in self._fs.ls(self._root_directory, detail=True)
if isinstance(f, Dict) and Path(f["name"]).suffix == ".jsonl"
]
dataset_files = self._file_system.glob(
path=str(self._root_directory) + "/**/*.json",
maxdepth=2,
detail=False,
)
return sorted([Path(f).stem for f in dataset_files])

def example(
self,
Expand All @@ -135,11 +196,13 @@ def example(
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
) -> Optional[Example[Input, ExpectedOutput]]:
example_path = self._dataset_path(dataset_id)
if not self._fs.exists(example_path):
example_path = self._dataset_examples_path(dataset_id)
if not self._file_system.exists(example_path):
return None

with self._fs.open(example_path, "r", encoding="utf-8") as examples_file:
with self._file_system.open(
str(example_path), "r", encoding="utf-8"
) as examples_file:
for example in examples_file:
# mypy does not accept dynamic types
validated_example = Example[input_type, expected_output_type].model_validate_json(json_data=example) # type: ignore
Expand All @@ -153,49 +216,63 @@ def examples(
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
) -> Iterable[Example[Input, ExpectedOutput]]:
example_path = self._dataset_path(dataset_id)
if not self._fs.exists(example_path):
example_path = self._dataset_examples_path(dataset_id)
if not self._file_system.exists(example_path):
return []

with self._fs.open(example_path, "r", encoding="utf-8") as examples_file:
with self._file_system.open(
str(example_path), "r", encoding="utf-8"
) as examples_file:
# Mypy does not accept dynamic types
examples = [Example[input_type, expected_output_type].model_validate_json(json_data=example) for example in examples_file] # type: ignore

return sorted(examples, key=lambda example: example.id)

def _dataset_path(self, dataset_id: str) -> str:
return self._root_directory + f"/{dataset_id}.jsonl"
def _dataset_directory(self, dataset_id: str) -> Path:
return self._root_directory / f"{dataset_id}"

def _dataset_path(self, dataset_id: str) -> Path:
return self._dataset_directory(dataset_id) / f"{dataset_id}.json"

def _dataset_examples_path(self, dataset_id: str) -> Path:
return self._dataset_directory(dataset_id) / f"{dataset_id}.jsonl"


class InMemoryDatasetRepository(DatasetRepository):
def __init__(self) -> None:
self._datasets: dict[
str, Sequence[Example[PydanticSerializable, PydanticSerializable]]
self._datasets_and_examples: dict[
str,
Tuple[
Dataset, Sequence[Example[PydanticSerializable, PydanticSerializable]]
],
] = {}

def create_dataset(
self,
examples: Iterable[Example[Input, ExpectedOutput]],
) -> str:
dataset_id = str(uuid4())
if dataset_id in self._datasets:
raise ValueError(f"Dataset name {dataset_id} already taken")

in_memory_examples = [
cast(
Example[PydanticSerializable, PydanticSerializable],
example,
self, examples: Iterable[Example[Input, ExpectedOutput]], dataset_name: str
) -> Dataset:
dataset = Dataset(name=dataset_name)
if dataset.id in self._datasets_and_examples:
raise ValueError(
f"Created random dataset ID already exists for dataset {dataset}. This should not happen."
)
for example in examples
]
self._datasets[dataset_id] = in_memory_examples
return dataset_id

examples_casted = cast(
Sequence[Example[PydanticSerializable, PydanticSerializable]], examples
)
self._datasets_and_examples[dataset.id] = (dataset, examples_casted)

return dataset

def delete_dataset(self, dataset_id: str) -> None:
self._datasets.pop(dataset_id, None)
self._datasets_and_examples.pop(dataset_id, None)

def dataset(self, dataset_id: str) -> Optional[Dataset]:
if dataset_id in self._datasets_and_examples:
return self._datasets_and_examples[dataset_id][0]
return None

def dataset_ids(self) -> Iterable[str]:
return sorted(list(self._datasets.keys()))
return sorted(list(self._datasets_and_examples.keys()))

def example(
self,
Expand All @@ -214,13 +291,17 @@ def examples(
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
) -> Iterable[Example[Input, ExpectedOutput]]:
if dataset_id not in self._datasets_and_examples:
return []
return cast(
Iterable[Example[Input, ExpectedOutput]],
sorted(self._datasets.get(dataset_id, []), key=lambda example: example.id),
sorted(
self._datasets_and_examples[dataset_id][1],
key=lambda example: example.id,
),
)


class FileDatasetRepository(FileSystemDatasetRepository):
def __init__(self, root_directory: Path) -> None:
super().__init__(LocalFileSystem(), str(root_directory))
root_directory.mkdir(parents=True, exist_ok=True)
super().__init__(LocalFileSystem(), root_directory)
12 changes: 12 additions & 0 deletions src/intelligence_layer/evaluation/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@
AggregatedEvaluation = TypeVar("AggregatedEvaluation", bound=BaseModel, covariant=True)


class Dataset(BaseModel):
"""Represents a dataset linked to multiple examples
Attributes:
id: Dataset ID.
name: A short name of the dataset.
"""

id: str = Field(default_factory=lambda: str(uuid4()))
name: str


class FailedExampleRun(BaseModel):
"""Captures an exception raised when running a single example with a :class:`Task`.
Expand Down
8 changes: 4 additions & 4 deletions src/intelligence_layer/evaluation/hugging_face.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pathlib import Path

import huggingface_hub # type: ignore
from huggingface_hub import HfFileSystem, create_repo

Expand All @@ -19,14 +21,12 @@ def __init__(self, database_name: str, token: str, private: bool) -> None:
private=private,
)
self._database_name = database_name
fs = HfFileSystem(token=token)
root_directory = f"datasets/{database_name}"
super().__init__(fs, root_directory)
super().__init__(HfFileSystem(token=token), Path(f"datasets/{database_name}"))

def delete_repository(self) -> None:
huggingface_hub.delete_repo(
database_name=self._database_name,
token=self._fs.token,
token=self._file_system.token,
repo_type=HuggingFaceDatasetRepository._REPO_TYPE,
missing_ok=True,
)
Loading

0 comments on commit 85eed4e

Please sign in to comment.