From a24d95e41f223d4ab4fe2ae016f418616b864f59 Mon Sep 17 00:00:00 2001 From: Wang Bo Date: Tue, 21 Mar 2023 09:15:07 +0100 Subject: [PATCH] feat: add score document support in csv (#696) * feat: add score document support in csv * refactor: add csv handlers * refactor: remove build finetuning dataset * test: add unit tests * test: fix csv reader add stringio as hints * feat: rename modality variable to col1 col2 * test: add unit test * feat: use a trial input size for mlp * feat: use a trial input size for mlp * refactor: fix task when model is mlp * chore: add changelog * feat: improve variable names and docstring * feat: rename handler to parser * feat: add docstring to csv context * test: debug experiment endpoint * test: debug experiment endpoint * chore: update changelog Co-authored-by: George Mastrapas <32414777+gmastrapas@users.noreply.github.com> --------- Co-authored-by: George Mastrapas <32414777+gmastrapas@users.noreply.github.com> --- CHANGELOG.md | 4 + finetuner/client/client.py | 2 +- finetuner/constants.py | 1 + finetuner/data.py | 368 +++++++++++++++++++++++++------------ finetuner/experiment.py | 19 +- tests/unit/test_client.py | 8 +- tests/unit/test_data.py | 106 +++++------ 7 files changed, 323 insertions(+), 185 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8874b2ea..e5dd7a9bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support pair-wise score document construction from CSV. ([#696](https://github.com/jina-ai/finetuner/pull/696)) + ### Removed ### Changed +- Refactor `load_finetuning_dataset` into CSV handlers. ([#696](https://github.com/jina-ai/finetuner/pull/696)) + ### Fixed ### Docs diff --git a/finetuner/client/client.py b/finetuner/client/client.py index b238dc3aa..364aaef9b 100644 --- a/finetuner/client/client.py +++ b/finetuner/client/client.py @@ -70,7 +70,7 @@ def list_experiments(self, page: int = 1, size: int = 50) -> Dict[str, Any]: ..note:: The maximum number for `size` per page is 100. """ params = {'page': page, 'size': size} - url = self._construct_url(self._base_url, API_VERSION, EXPERIMENTS) + url = self._construct_url(self._base_url, API_VERSION, EXPERIMENTS) + '/' return self._handle_request(url=url, method=GET, params=params) def delete_experiment(self, name: str) -> Dict[str, Any]: diff --git a/finetuner/constants.py b/finetuner/constants.py index 331dcf44a..bce546d68 100644 --- a/finetuner/constants.py +++ b/finetuner/constants.py @@ -41,6 +41,7 @@ ARTIFACT = 'artifact' ARTIFACT_ID = 'artifact_id' DEFAULT_TAG_KEY = 'finetuner_label' +DEFAULT_TAG_SCORE_KEY = 'finetuner_score' # Run status CREATED = 'CREATED' STARTED = 'STARTED' diff --git a/finetuner/data.py b/finetuner/data.py index 3e26559c5..24795075d 100644 --- a/finetuner/data.py +++ b/finetuner/data.py @@ -1,8 +1,11 @@ +import abc import csv import os +from abc import ABC from contextlib import nullcontext from dataclasses import dataclass -from typing import TYPE_CHECKING, Generator, List, Optional, TextIO, Tuple, Union +from io import StringIO +from typing import TYPE_CHECKING, List, Optional, TextIO, Tuple, Union from _finetuner.runner.stubs.model import get_stub from docarray import Document, DocumentArray @@ -10,7 +13,7 @@ from docarray.document.mixins.helper import _is_uri from genericpath import isfile -from finetuner.constants import DEFAULT_TAG_KEY +from finetuner.constants import DEFAULT_TAG_KEY, DEFAULT_TAG_SCORE_KEY if TYPE_CHECKING: from _finetuner.models.inference import InferenceEngine @@ -48,151 +51,280 @@ class CSVOptions: point_cloud_size: int = 2048 -def build_finetuning_dataset( - data: Union[str, TextIO, DocumentArray], - model: str, - csv_options: Optional[CSVOptions] = None, -) -> Union[str, DocumentArray]: - """If data has been provided as a CSV file, the given CSV file is parsed - and a :class:`DocumentArray` is created. - """ - if isinstance(data, (TextIO)) or (isinstance(data, str) and isfile(data)): - model_stub = get_stub( - model, select_model='clip-text' - ) # for clip select_model is mandatory, though any model will get us the task - data = DocumentArray( - load_finetune_data_from_csv( - file=data, - task=model_stub.task, - options=csv_options or CSVOptions(), +class _CSVParser(ABC): + def __init__( + self, + file: Union[str, TextIO, StringIO], + task: str, + options: Optional[CSVOptions] = None, + ): + self._file = file + self._task = task + self._options = options or CSVOptions() + if isinstance(self._options.dialect, str) and self._options.dialect == 'auto': + self._dialect, _ = get_csv_file_dialect_columns( + self._file, encoding=options.encoding ) - ) + self._options.dialect = self._dialect + self._file_ctx = get_csv_file_context(file=file, encoding=options.encoding) - return data + @abc.abstractmethod + def parse(self): + ... -def build_encoding_dataset(model: 'InferenceEngine', data: List[str]) -> DocumentArray: - """If data has been provided as a list, a :class:`DocumentArray` is created - from the elements of the list +class LabeledCSVParser(_CSVParser): + """ + CSV has two columns where the first column is the data, the second column is the + label. To use the handler, make sure csv contains two columns and `is_labeled=True`. """ - modalities = model._metadata['preprocess_types'] - if model._select_model: - if model._select_model == 'clip-text': - task = 'text' - else: - task = 'image' - elif list(modalities)[0] == ['features']: - raise ValueError('MLP model does not support values from a list.') - else: - task = list(modalities)[0] - data = DocumentArray( - [Document(text=d) if task == 'text' else Document(uri=d) for d in data] - ) + def __init__( + self, + file: Union[str, TextIO, StringIO], + task: str, + options: Optional[CSVOptions] = None, + ): + super().__init__(file, task, options) - return data + def parse(self): + with self._file_ctx as fp: + lines = csv.reader(fp, dialect=self._options.dialect) -def load_finetune_data_from_csv( - file: Union[str, TextIO], - task: str = 'text-to-text', - options: Optional[CSVOptions] = None, -) -> Generator['Document', None, None]: - """ - Takes a CSV file and returns a generator of documents, with each document containing - the information from one line of the CSV. + for columns in _subsample( + lines, self._options.size, self._options.sampling_rate + ): + col1, col2 = columns + modality_col1, modality_col2 = check_columns(self._task, col1, col2) + doc = create_document( + modality_col1, + col1, + self._options.convert_to_blob, + self._options.create_point_clouds, + point_cloud_size=self._options.point_cloud_size, + ) + doc.tags[DEFAULT_TAG_KEY] = col2 + yield doc - :param file: Either a filepath to or a stream of a CSV file. - :param task: Specifies the modalities of the model that the returned data is to - be used for. This data is retrieved using the model name, and does not need - to be added to the csv_options argument when calling :meth:`finetuner.fit` - :param options: A :class:`CSVOptions` object. - :return: A generator of :class:`Document`s. Each document represents one element - in the CSV +class QueryDocumentRelationsParser(_CSVParser): + """ + In the case that user do not have explicitly annotated labels, + but rather a set of query-document pairs which express that a document is + relevant to a query, or form as a text-image pair. """ - options = options or CSVOptions() - - if hasattr(file, 'read'): - file_ctx = nullcontext(file) - else: - file_ctx = open(file, 'r', encoding=options.encoding) - - with file_ctx as fp: - # when set to auto, then sniff - try: - if isinstance(options.dialect, str) and options.dialect == 'auto': - options.dialect = csv.Sniffer().sniff(fp.read(1024)) - fp.seek(0) - except Exception: - options.dialect = 'excel' #: can not sniff delimiter, use default dialect - - lines = csv.reader(fp, dialect=options.dialect) + def __init__( + self, + file: Union[str, TextIO, StringIO], + task: str, + options: Optional[CSVOptions] = None, + ): + super().__init__(file, task, options) - artificial_label = 0 - t1, t2 = None, None + def parse(self): + with self._file_ctx as fp: - if not options.is_labeled: queries = {} - for columns in _subsample(lines, options.size, options.sampling_rate): - if columns: - col1, col2 = columns - else: - continue # skip empty lines - if not t1: # determining which column contains images - t1, t2 = check_columns(task, col1, col2) - - d1 = create_document( - t1, - col1, - options.convert_to_blob, - options.create_point_clouds, - point_cloud_size=options.point_cloud_size, - ) + artificial_label = 0 + modality_col1, modality_col2 = None, None + lines = csv.reader(fp, dialect=self._options.dialect) - if options.is_labeled: - label = col2 - d1.tags[DEFAULT_TAG_KEY] = label - yield d1 - else: - d2 = create_document( - t2, - col2, - options.convert_to_blob, - options.create_point_clouds, - point_cloud_size=options.point_cloud_size, - ) + for columns in _subsample( + lines, self._options.size, self._options.sampling_rate + ): + col1, col2 = columns if col1 in queries and col2 in queries: continue + if not modality_col1: + modality_col1, modality_col2 = check_columns(self._task, col1, col2) + doc1 = create_document( + modality_col1, + col1, + self._options.convert_to_blob, + self._options.create_point_clouds, + point_cloud_size=self._options.point_cloud_size, + ) + doc2 = create_document( + modality_col2, + col2, + self._options.convert_to_blob, + self._options.create_point_clouds, + point_cloud_size=self._options.point_cloud_size, + ) if col1 in queries: queries[col2] = queries[col1] - d2.tags[DEFAULT_TAG_KEY] = queries[col1] + doc2.tags[DEFAULT_TAG_KEY] = queries[col1] # only yield d2 else: queries[col1] = artificial_label queries[col2] = artificial_label # yield both - d1.tags[DEFAULT_TAG_KEY] = queries[col1] - d2.tags[DEFAULT_TAG_KEY] = queries[col1] + doc1.tags[DEFAULT_TAG_KEY] = queries[col1] + doc2.tags[DEFAULT_TAG_KEY] = queries[col1] artificial_label += 1 - if t1 == t2: - d1.modality = t1 - d2.modality = t1 - if DEFAULT_TAG_KEY in d1.tags: - yield d1 - if DEFAULT_TAG_KEY in d2.tags: - yield d2 + if modality_col1 == modality_col2: + doc1.modality = modality_col1 + doc2.modality = modality_col1 + if DEFAULT_TAG_KEY in doc1.tags: + yield doc1 + if DEFAULT_TAG_KEY in doc2.tags: + yield doc2 else: # different modalities, for CLIP - d1.modality = t1 - d2.modality = t2 + doc1.modality = modality_col1 + doc2.modality = modality_col2 yield Document( - chunks=[d1, d2], tags={DEFAULT_TAG_KEY: queries[col1]} + chunks=[doc1, doc2], tags={DEFAULT_TAG_KEY: queries[col1]} ) +class PairwiseScoreParser(_CSVParser): + """ + CSV has three columns, column1, column2 and a float value indicates the + similarity between column1 and column2. + """ + + def __init__( + self, + file: Union[str, TextIO, StringIO], + task: str, + options: Optional[CSVOptions] = None, + ): + super().__init__(file, task, options) + + def parse(self): + with self._file_ctx as fp: + + lines = csv.reader(fp, dialect=self._options.dialect) + + for columns in _subsample( + lines, self._options.size, self._options.sampling_rate + ): + col1, col2, col3 = columns + modality_col1, modality_col2 = check_columns(self._task, col1, col2) + doc1 = create_document( + modality_col1, + col1, + self._options.convert_to_blob, + self._options.create_point_clouds, + point_cloud_size=self._options.point_cloud_size, + ) + doc2 = create_document( + modality_col2, + col2, + self._options.convert_to_blob, + self._options.create_point_clouds, + point_cloud_size=self._options.point_cloud_size, + ) + yield Document(chunks=[doc1, doc2], tags={DEFAULT_TAG_SCORE_KEY: col3}) + + +class CSVContext: + """ + A CSV context switch class with conditions to parse CSVs into DocumentArray. + + :param model: The model being used, to get model stub and associated task. + :param options: an instance of :class`CSVOptions`. + """ + + def __init__( + self, + model: str, + options: Optional[CSVOptions] = None, + ): + self._model = model + self._options = options or CSVOptions() + if model == 'mlp': + self._task = 'image-to-image' + else: + model_stub = get_stub( + model, + select_model='clip-text', + ) + # for clip select_model is mandatory, though any model will get us the task + self._task = model_stub.task + + def _get_csv_parser(self, data: Union[str, TextIO]): + if self._options.is_labeled: + return LabeledCSVParser(file=data, task=self._task, options=self._options) + else: + _, num_columns = get_csv_file_dialect_columns( + file=data, encoding=self._options.encoding + ) + if num_columns == 2: + return QueryDocumentRelationsParser( + file=data, task=self._task, options=self._options + ) + elif num_columns == 3: + return PairwiseScoreParser( + file=data, task=self._task, options=self._options + ) + else: + raise TypeError('Can not determine the context of the csv.') + + def build_dataset(self, data: Union[str, TextIO, StringIO, DocumentArray]): + if ( + isinstance(data, TextIO) + or isinstance(data, StringIO) + or (isinstance(data, str) and isfile(data)) + ): + parser = self._get_csv_parser(data=data) + da_generator = parser.parse() + data = DocumentArray(da_generator) + + return data + + +def get_csv_file_context(file: Union[str, TextIO, StringIO], encoding: str): + """Get csv file context, such as `file_ctx`, csv dialect and number of columns.""" + if hasattr(file, 'read'): + return nullcontext(file) + return open(file, 'r', encoding=encoding) + + +def get_csv_file_dialect_columns(file: str, encoding: str): + """Get csv dialect and number of columns of the csv.""" + file_ctx = get_csv_file_context(file=file, encoding=encoding) + with file_ctx as fp: + try: + dialect = csv.Sniffer().sniff(fp.read(1024)) + fp.seek(0) + except Exception: + dialect = 'excel' #: can not sniff delimiter, use default dialect + try: + reader = csv.reader(fp, dialect=dialect) + num_columns = len(next(reader)) + except StopIteration: + raise IOError('CSV file not exist or is empty') + fp.seek(0) + return dialect, num_columns + + +def build_encoding_dataset(model: 'InferenceEngine', data: List[str]) -> DocumentArray: + """If data has been provided as a list, a :class:`DocumentArray` is created + from the elements of the list + """ + modalities = model._metadata['preprocess_types'] + if model._select_model: + if model._select_model == 'clip-text': + task = 'text' + else: + task = 'image' + elif list(modalities)[0] == ['features']: + raise ValueError('MLP model does not support values from a list.') + else: + task = list(modalities)[0] + + data = DocumentArray( + [Document(text=d) if task == 'text' else Document(uri=d) for d in data] + ) + + return data + + def check_columns( task: str, col1: str, @@ -211,14 +343,14 @@ def check_columns( raise ValueError('MLP model does not support values read in from CSV files.') if len(task.split('-to-')) == 2: - t1, t2 = task.split('-to-') + modality_col1, modality_col2 = task.split('-to-') else: raise ValueError(f'Model has invalid task: {task}') - if t1 == 'text' and t2 == 'image': + if modality_col1 == 'text' and modality_col2 == 'image': if _is_uri(col1) and not _is_uri(col2): - t1 = 'image' - t2 = 'text' + modality_col1 = 'image' + modality_col2 = 'text' elif not _is_uri(col2): raise ValueError( ( @@ -228,7 +360,7 @@ def check_columns( '.' ) ) - return t1, t2 + return modality_col1, modality_col2 def create_document( diff --git a/finetuner/experiment.py b/finetuner/experiment.py index 80ed2c796..0f46e7ae8 100644 --- a/finetuner/experiment.py +++ b/finetuner/experiment.py @@ -37,7 +37,7 @@ SCHEDULER_OPTIONS, VAL_SPLIT, ) -from finetuner.data import build_finetuning_dataset +from finetuner.data import CSVContext from finetuner.hubble import push_data from finetuner.names import get_random_name from finetuner.run import Run @@ -150,20 +150,17 @@ def create_run( if isinstance(callback, EvaluationCallback): eval_callback = callback - train_data = build_finetuning_dataset(train_data, model, csv_options) + csv_context = CSVContext(model=model, options=csv_options) + train_data = csv_context.build_dataset(data=train_data) - eval_data = ( - build_finetuning_dataset(eval_data, model, csv_options) - if eval_data - else None - ) + eval_data = csv_context.build_dataset(data=eval_data) if eval_data else None if eval_callback: - eval_callback.query_data = build_finetuning_dataset( - eval_callback.query_data, model, csv_options + eval_callback.query_data = csv_context.build_dataset( + data=eval_callback.query_data, ) - eval_callback.index_data = build_finetuning_dataset( - eval_callback.index_data, model, csv_options + eval_callback.index_data = csv_context.build_dataset( + eval_callback.index_data ) train_data, eval_data, query_data, index_data = push_data( diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index ef9ff4b0a..88b0affe2 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -40,8 +40,12 @@ def test_get_experiment(client_mocker, name='name'): def test_list_experiments(client_mocker): sent_request = client_mocker.list_experiments() - assert sent_request['url'] == client_mocker._construct_url( - client_mocker._base_url, API_VERSION, EXPERIMENTS + assert ( + sent_request['url'] + == client_mocker._construct_url( + client_mocker._base_url, API_VERSION, EXPERIMENTS + ) + + '/' ) assert sent_request['method'] == GET diff --git a/tests/unit/test_data.py b/tests/unit/test_data.py index 3872f2131..f06517ffd 100644 --- a/tests/unit/test_data.py +++ b/tests/unit/test_data.py @@ -5,14 +5,8 @@ import pytest from docarray import Document, DocumentArray -from finetuner.constants import DEFAULT_TAG_KEY -from finetuner.data import ( - CSVOptions, - build_finetuning_dataset, - check_columns, - create_document, - load_finetune_data_from_csv, -) +from finetuner.constants import DEFAULT_TAG_KEY, DEFAULT_TAG_SCORE_KEY +from finetuner.data import CSVContext, CSVOptions, check_columns, create_document current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -29,30 +23,24 @@ def dummy_csv_file(): return os.path.join(current_dir, 'resources/dummy.csv') -@pytest.mark.parametrize( - 'data, is_file', [(dummy_csv_file(), True), ('notarealfile', False)] -) -def test_build_finetuning_dataset_str(data, is_file): +@pytest.mark.parametrize('data, is_file', [(dummy_csv_file(), True)]) +def test_build_dataset_str(data, is_file): options = CSVOptions(dialect='excel') - - new_da = build_finetuning_dataset( - data=data, model='bert-base-cased', csv_options=options - ) + csv_context = CSVContext(model='bert-base-cased', options=options) if is_file: + new_da = csv_context.build_dataset(data=data) assert isinstance(new_da, DocumentArray) else: - assert isinstance(new_da, str) - + with pytest.raises(IOError): + _ = csv_context.build_dataset(data=data) -def test_build_finetuning_dataset_DocumentArray(): +def test_build_dataset_from_document_array(): da = DocumentArray(Document()) - new_da = build_finetuning_dataset( - data=da, - model='does not matter', - ) + csv_context = CSVContext(model='bert-base-cased') + new_da = csv_context.build_dataset(da) assert da == new_da @@ -66,11 +54,9 @@ def test_load_finetune_data_from_csv_text_to_text(dialect): options = CSVOptions(dialect=dialect) - docs = load_finetune_data_from_csv( - file=StringIO(content_stream), - task='text-to-text', - options=options, - ) + csv_context = CSVContext(model='bert-base-cased', options=options) + docs = csv_context.build_dataset(data=StringIO(content_stream)) + flat_contents = [x for pair in contents for x in pair] for doc, expected in zip(docs, flat_contents): assert doc.content == expected @@ -89,12 +75,8 @@ def test_load_finetune_data_from_csv_image_to_image(dialect): ) options = CSVOptions(dialect=dialect) - - docs = load_finetune_data_from_csv( - file=StringIO(content_stream), - task='image-to-image', - options=options, - ) + csv_context = CSVContext(model='resnet50', options=options) + docs = csv_context.build_dataset(data=StringIO(content_stream)) for doc in docs: assert doc.uri == path_to_lena @@ -102,16 +84,21 @@ def test_load_finetune_data_from_csv_image_to_image(dialect): @pytest.mark.parametrize('dialect', csv.list_dialects()) @pytest.mark.parametrize( - 'contents, type', + 'contents, type, model', [ - ([['apple', 'apple-label'], ['orange', 'orange-label']], 'text'), + ( + [['apple', 'apple-label'], ['orange', 'orange-label']], + 'text', + 'bert-base-cased', + ), ( [[lena_img_file(), 'apple-label'], [lena_img_file(), 'orange-label']], 'image', + 'resnet50', ), ], ) -def test_load_finetune_data_from_csv_labeled(dialect, contents, type): +def test_load_finetune_data_from_csv_labeled(dialect, contents, type, model): dialect = csv.get_dialect(dialect) content_stream = dialect.lineterminator.join( @@ -119,12 +106,9 @@ def test_load_finetune_data_from_csv_labeled(dialect, contents, type): ) options = CSVOptions(dialect=dialect, is_labeled=True) + csv_context = CSVContext(model=model, options=options) - docs = load_finetune_data_from_csv( - file=StringIO(content_stream), - task='-to-'.join((type, type)), - options=options, - ) + docs = csv_context.build_dataset(data=StringIO(content_stream)) for doc, expected in zip(docs, contents): assert doc.tags[DEFAULT_TAG_KEY] == expected[1] @@ -152,22 +136,15 @@ def test_load_finetune_data_from_csv_multimodal(dialect, contents, expect_error) ) options = CSVOptions(dialect=dialect) + csv_context = CSVContext(model='ViT-B-32::openai', options=options) if expect_error: with pytest.raises(expect_error): - docs = load_finetune_data_from_csv( - file=StringIO(content_stream), - task='text-to-image', - options=options, - ) - for doc in docs: + docs = csv_context.build_dataset(data=StringIO(content_stream)) + for _ in docs: pass else: - docs = load_finetune_data_from_csv( - file=StringIO(content_stream), - task='text-to-image', - options=options, - ) + docs = csv_context.build_dataset(data=StringIO(content_stream)) for doc, expected in zip(docs, contents): assert len(doc.chunks) == 2 @@ -183,6 +160,29 @@ def test_load_finetune_data_from_csv_multimodal(dialect, contents, expect_error) assert doc.chunks[1].modality == 'image' +@pytest.mark.parametrize('dialect', [csv.list_dialects()[0]]) +@pytest.mark.parametrize( + 'contents, model', + [ + ([[lena_img_file(), lena_img_file(), '1']], 'resnet50'), + ([['apple', 'orange', '0.2']], 'bert-base-cased'), + ], +) +def test_load_finetune_data_with_scores(contents, model, dialect): + dialect = csv.get_dialect(dialect) + content_stream = dialect.lineterminator.join( + [dialect.delimiter.join(x) for x in contents] + ) + options = CSVOptions(dialect=dialect) + csv_context = CSVContext(model='bert-base-cased', options=options) + + data = csv_context.build_dataset(data=StringIO(content_stream)) + assert isinstance(data, DocumentArray) + for doc in data: + assert len(doc.chunks) == 2 + assert DEFAULT_TAG_SCORE_KEY in doc.tags + + @pytest.mark.parametrize( 'task, col1, col2, exp1, exp2, expect_error', [