From ac6c94c4b53d9cf4edc68ebc0bf89c59a052c9f4 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Sun, 4 Feb 2024 20:23:50 +0100 Subject: [PATCH 01/16] refactor: Rewrite class resolver register to be independent of specific app config --- src/modalities/resolver_register.py | 174 +++++++++++++++------------- 1 file changed, 93 insertions(+), 81 deletions(-) diff --git a/src/modalities/resolver_register.py b/src/modalities/resolver_register.py index 9f571efe..6a87c356 100644 --- a/src/modalities/resolver_register.py +++ b/src/modalities/resolver_register.py @@ -10,6 +10,7 @@ from modalities.checkpointing.checkpointing import CheckpointingExecutionIF, CheckpointingStrategyIF from modalities.config.config import AppConfig, OptimizerTypes, SchedulerTypes from modalities.config.lookup_types import ( + LookupEnum, BatchSamplerTypes, CheckpointingExectionTypes, CheckpointingStrategyTypes, @@ -20,7 +21,9 @@ ModelTypes, SamplerTypes, TokenizerTypes, + CodecTypes ) +from modalities.dataloader.codecs import Codec from modalities.dataloader.dataloader import LLMDataLoader from modalities.dataloader.dataset import Dataset from modalities.loss_functions import CLMCrossEntropyLoss, Loss @@ -28,120 +31,129 @@ from modalities.models.gpt2.gpt2_model import GPT2LLM, NNModel from modalities.running_env.fsdp.fsdp_running_env import FSDPRunningEnv, RunningEnv, RunningEnvTypes - +# TODO: this should be a singleton class ResolverRegister: - def __init__(self, config: AppConfig) -> None: - self._resolver_register: Dict[str, ClassResolver] = self._create_resolver_register(config=config) - def build_component_by_config(self, config: BaseModel, extra_kwargs: Dict = {}) -> Any: + # TODO: args and kwargs only to be backwards compatible + # older versions required the appconfig as argument + def __init__(self, *args, **kwargs): + self._resolver_register = self._build_resolver_register() + + def build_component_by_key_query( + self, register_key: str, type_hint: str, extra_kwargs: Dict = {} + ) -> Any: + raise NotImplementedError + + def build_component_by_config( + self, config: BaseModel, extra_kwargs: Dict[str, Any] = {} + ) -> Any: + assert ( "type_hint" in config.model_fields.keys() ), f"Field 'type_hint' missing but needed for initalisation in {config}" + + assert ( + "config" in config.model_fields.keys() + ), f"Field 'config' missing but needed for initalisation in {config}" + + kwargs = extra_kwargs.copy() + + for key in config.config.model_fields.keys(): + # get the value corresponding to the key + # prefer the extra keyword arguments when both specified + val = getattr(config.config, key) + val = kwargs.get(key, val) + + # handle nested components + if ( + isinstance(val, BaseModel) and + "type_hint" in val.model_fields and + "config" in val.model_fields + ): + kwargs[key] = self.build_component_by_config(val) + + else: + kwargs[key] = val - kwargs = {key: getattr(config.config, key) for key in config.config.model_dump().keys()} - kwargs.update(extra_kwargs) # allow override via extra_kwargs, to add nested objects return self._build_component( - register_key=config.type_hint, + register_key=type(config.type_hint), register_query=config.type_hint.name, extra_kwargs=kwargs, ) - - def build_component_by_key_query(self, register_key: str, type_hint: str, extra_kwargs: Dict = {}) -> Any: - return self._build_component(register_key=register_key, register_query=type_hint, extra_kwargs=extra_kwargs) - - def _build_component(self, register_key: str, register_query: str, extra_kwargs: Dict = {}): + + def _build_component( + self, + register_key: LookupEnum, + register_query: str, + extra_kwargs: Dict[str, Any] = {} + ): + assert register_key in self._resolver_register return self._resolver_register[register_key].make( query=register_query, pos_kwargs=extra_kwargs, ) - def _find_values_with_key_in_nested_structure(self, nested_structure: Dict, key: str) -> List[Any]: - found_values = [] - for k, v in nested_structure.items(): - if k == key: - found_values.append(v) - elif isinstance(v, dict): - found_values.extend(self._find_values_with_key_in_nested_structure(v, key)) - return found_values - - def _create_resolver_register(self, config: AppConfig) -> Dict[str, ClassResolver]: - set(self._find_values_with_key_in_nested_structure(nested_structure=config.model_dump(), key="type_hint")) - resolvers = { - config.running_env.type_hint: ClassResolver( + def _build_resolver_register(self) -> List[LookupEnum]: + return { + RunningEnvTypes: ClassResolver( [t.value for t in RunningEnvTypes], base=RunningEnv, default=FSDPRunningEnv, ), - config.model.type_hint: ClassResolver( + ModelTypes: ClassResolver( [t.value for t in ModelTypes], base=NNModel, default=GPT2LLM, ), - config.optimizer.type_hint: ClassResolver( + OptimizerTypes: ClassResolver( [t.value for t in OptimizerTypes], base=optim.Optimizer, default=optim.AdamW, ), - config.scheduler.type_hint: ClassResolver( + SchedulerTypes: ClassResolver( [t.value for t in SchedulerTypes], base=optim.lr_scheduler.LRScheduler, default=optim.lr_scheduler.StepLR, ), - config.loss.type_hint: ClassResolver( + LossTypes: ClassResolver( [t.value for t in LossTypes], base=Loss, default=CLMCrossEntropyLoss, ), - **{ - sampler_type: ClassResolver( - classes=[t.value for t in SamplerTypes], - base=Sampler, - default=DistributedSampler, - ) - for sampler_type in SamplerTypes - }, - **{ - batch_sampler_type: ClassResolver( - classes=[t.value for t in BatchSamplerTypes], - base=BatchSampler, - default=BatchSampler, - ) - for batch_sampler_type in BatchSamplerTypes - }, - **{ - dataloader_type: ClassResolver( - [t.value for t in DataloaderTypes], - base=DataLoader, - default=LLMDataLoader, - ) - for dataloader_type in DataloaderTypes - }, - **{ - dataset_type: ClassResolver([t.value for t in DatasetTypes], base=Dataset) - for dataset_type in DatasetTypes - }, - **{ - collator_type: ClassResolver([t.value for t in CollatorTypes], base=GPT2LLMCollator) - for collator_type in CollatorTypes - }, - **{ - tokenizer_type: ClassResolver([t.value for t in TokenizerTypes], base=PreTrainedTokenizer) - for tokenizer_type in TokenizerTypes - }, - **{ - checkpointing_strategy_type: ClassResolver( - [t.value for t in CheckpointingStrategyTypes], base=CheckpointingStrategyIF - ) - for checkpointing_strategy_type in CheckpointingStrategyTypes - }, - **{ - checkpointing_execution_type: ClassResolver( - [t.value for t in CheckpointingExectionTypes], base=CheckpointingExecutionIF - ) - for checkpointing_execution_type in CheckpointingExectionTypes - }, + SamplerTypes: ClassResolver( + classes=[t.value for t in SamplerTypes], + base=Sampler, + default=DistributedSampler, + ), + BatchSamplerTypes: ClassResolver( + classes=[t.value for t in BatchSamplerTypes], + base=BatchSampler, + default=BatchSampler, + ), + DataloaderTypes: ClassResolver( + [t.value for t in DataloaderTypes], + base=DataLoader, + default=LLMDataLoader, + ), + DatasetTypes: ClassResolver( + [t.value for t in DatasetTypes], base=Dataset + ), + CollatorTypes: ClassResolver( + [t.value for t in CollatorTypes], base=GPT2LLMCollator + ), + TokenizerTypes: ClassResolver( + [t.value for t in TokenizerTypes], base=PreTrainedTokenizer + ), + CodecTypes: ClassResolver( + [t.value for t in CodecTypes], base=Codec + ), + CheckpointingStrategyTypes: ClassResolver( + [t.value for t in CheckpointingStrategyTypes], + base=CheckpointingStrategyIF + ), + # TODO: fix type in execution + CheckpointingExectionTypes: ClassResolver( + [t.value for t in CheckpointingExectionTypes], + base=CheckpointingExecutionIF + ) } - return resolvers - - def add_resolver(self, resolver_key: str, resolver: ClassResolver): - self._resolver_register[resolver_key] = resolver From 19a1d8588a16708a3be2e3e77dd43098b16df565 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Sun, 4 Feb 2024 20:25:40 +0100 Subject: [PATCH 02/16] feat: Support multiple modalities in MemMapDataset --- config_files/data_config.yaml | 15 ++ src/modalities/__main__.py | 51 ++--- src/modalities/config/config.py | 28 +++ src/modalities/config/lookup_types.py | 10 + src/modalities/dataloader/codecs.py | 111 +++++++++ .../dataloader/create_packed_data.py | 141 +++++++----- src/modalities/dataloader/dataset.py | 213 ++++++++++++++---- 7 files changed, 442 insertions(+), 127 deletions(-) create mode 100644 config_files/data_config.yaml create mode 100644 src/modalities/dataloader/codecs.py diff --git a/config_files/data_config.yaml b/config_files/data_config.yaml new file mode 100644 index 00000000..f6899076 --- /dev/null +++ b/config_files/data_config.yaml @@ -0,0 +1,15 @@ +features: + - jq_pattern: .cls + codec: + type_hint: HfTokenizerCodec + config: + add_eos_token: true + tokenizer: + type_hint: GPT2TokenizerFast + config: + tokenizer_file: ./data/tokenizer/tokenizer.json + - jq_pattern: .img_path + codec: + type_hint: PillowImageCodec + config: + save_format: png diff --git a/src/modalities/__main__.py b/src/modalities/__main__.py index 8f709130..c90e1568 100644 --- a/src/modalities/__main__.py +++ b/src/modalities/__main__.py @@ -15,7 +15,7 @@ from modalities.batch import EvaluationResultBatch from modalities.checkpointing.checkpointing import Checkpointing, CheckpointingIF from modalities.checkpointing.checkpointing_factory import CheckpointingFactory -from modalities.config.config import AppConfig, ModalitiesSetupConfig, RunMode +from modalities.config.config import AppConfig, ModalitiesSetupConfig, RunMode, PreparationAppConfig from modalities.config.lookup_types import TokenizerTypes from modalities.dataloader.create_index import IndexGenerator from modalities.dataloader.create_packed_data import PackedDataGenerator @@ -104,6 +104,7 @@ def entry_point_create_memmap_index(src_path, index_path): @main.command(name="create_packed_data") @click.argument("src_path", type=Path) +@click.argument("config_file_path", type=Path) @click.option( "--dst_path", type=str, @@ -111,41 +112,27 @@ def entry_point_create_memmap_index(src_path, index_path): help="output path for packed data file. will use parent directory of src_path if none.", ) @click.option( - "--index_path", + "--idx_path", type=Path, default=None, help="input path for index. will search in parent directory of src_path if none.", ) -@click.option( - "--tokenizer_type", - type=TokenizerTypes, - show_default=True, - default=TokenizerTypes.GPT2TokenizerFast, - help="Specify which Tokenizer (inheriting from transformers.PretrainedTokenizers) should get used.", -) -@click.option( - "--tokenizer_file", - type=Path, - show_default=True, - default=Path(__file__).parents[2] / Path("data/tokenizer/tokenizer.json"), - help="path to tokenizer json", -) -@click.option( - "--jq_pattern", - type=str, - show_default=True, - default=".text", - help="jq pattern to extract the data from the json line.", -) -def entry_point_create_packed_data(src_path, dst_path, index_path, tokenizer_type, tokenizer_file, jq_pattern): - # TODO: if we want to use alternative entrypoints together with the ResolverRegistry, - # we can currently not rely on the existing class resolver. - # This is based on its connection to the overall `AppConfig`. - # One would requires an object of it to instantiate the ResolverRegistry. - # This could get resolved by implementing on own ResolverRegistry for each entrypoint or adapting the existing - # ResolverRegistry to work dynamically with any type-hinted config object from config.py. - tokenizer = tokenizer_type.value(tokenizer_file=str(tokenizer_file)) - generator = PackedDataGenerator(src_path, index_path=index_path, tokenizer=tokenizer, jq_pattern=jq_pattern) +def entry_point_create_packed_data(src_path, config_file_path, dst_path, idx_path): + + config_dict = load_app_config_dict(config_file_path) + config = PreparationAppConfig.model_validate(config_dict) + # build codec components + resolvers = ResolverRegister() + codecs = { + f.jq_pattern: resolvers.build_component_by_config(f.codec) + for f in config.features + } + # generate packed data + generator = PackedDataGenerator( + codecs, + src_path=src_path, + idx_path=idx_path, + ) generator.run(dst_path) diff --git a/src/modalities/config/config.py b/src/modalities/config/config.py index 0e166242..946f5cc5 100644 --- a/src/modalities/config/config.py +++ b/src/modalities/config/config.py @@ -20,6 +20,7 @@ SamplerTypes, SchedulerTypes, TokenizerTypes, + CodecTypes ) from modalities.config.types import ProcessGroupBackendType from modalities.models.gpt2.gpt2_model import GPT2Config @@ -51,6 +52,33 @@ class GPT2TokenizerFastConfig(BaseModel): config: GPT2TokenizerFastConfig +class CodecConfig(BaseModel): + + class HfTokenizerCodecConfig(BaseModel): + tokenizer: TokenizerConfig + max_length: Optional[int] = None + add_eos_token: bool = True + + class PillowImageCodecConfig(BaseModel): + save_format: str = "png" + + type_hint: CodecTypes + config: Union[ + HfTokenizerCodecConfig, + PillowImageCodecConfig + ] = Field(union_mode="left_to_right") + + +class FeatureConfig(BaseModel): + + codec: CodecConfig + jq_pattern: str + +class PreparationAppConfig(BaseModel): + + features: List[FeatureConfig] + + class DatasetConfig(BaseModel): class MemMapDatasetConfig(BaseModel): raw_data_path: FilePath diff --git a/src/modalities/config/lookup_types.py b/src/modalities/config/lookup_types.py index 46147480..04b5535a 100644 --- a/src/modalities/config/lookup_types.py +++ b/src/modalities/config/lookup_types.py @@ -17,6 +17,11 @@ from modalities.models.gpt2.collator import GPT2LLMCollator from modalities.models.gpt2.gpt2_model import GPT2LLM +from modalities.dataloader.codecs import ( + HfTokenizerCodec, + PillowImageCodec +) + class LookupEnum(Enum): @classmethod @@ -47,6 +52,11 @@ class TokenizerTypes(LookupEnum): GPT2TokenizerFast = GPT2TokenizerFast +class CodecTypes(LookupEnum): + HfTokenizerCodec = HfTokenizerCodec + PillowImageCodec = PillowImageCodec + + class DatasetTypes(LookupEnum): MemMapDataset = MemMapDataset PackedMemMapDatasetContinuous = PackedMemMapDatasetContinuous diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py new file mode 100644 index 00000000..dcf679d3 --- /dev/null +++ b/src/modalities/dataloader/codecs.py @@ -0,0 +1,111 @@ +from abc import ABC, abstractmethod +from typing import TypeVar, Generic, Optional, Dict, Any + +from io import BytesIO +from PIL import Image +from transformers import PreTrainedTokenizer + +T = TypeVar("T") + +class Codec(ABC, Generic[T]): + @abstractmethod + def encode(self, obj: T) -> bytes: + pass + + @staticmethod + @abstractmethod + def decode(serialized_obj: bytes) -> T: + pass + + +class FixSizedCodec(Codec[T]): + """Base class for fix-sized Codecs + + Fix-sized codecs are special in that they encode a sequence of values where + each value is encoded by a fix number of bytes. The length of thegenerated + bytestring is an integer multiple of `num_bytes_per_value`. + """ + + @classmethod + @abstractmethod + def num_bytes_per_value(cls) -> int: + raise NotImplementedError + + +class HfTokenizerCodec(FixSizedCodec[str]): + + TOKEN_SIZE_IN_BYTES = 4 + + @classmethod + def num_bytes_per_value(cls) -> int: + return cls.TOKEN_SIZE_IN_BYTES + + def __init__( + self, + tokenizer: PreTrainedTokenizer, + max_length: Optional[int] = None, + add_eos_token: bool = True + ) -> None: + + # instantiate + self.tokenizer = tokenizer + self.add_eos_token = add_eos_token + + if add_eos_token: + # get eos token in bytes to append to the end of each sequence + eos_token = self.tokenizer.convert_tokens_to_ids(self.tokenizer.eos_token) + self.eos_token = eos_token.to_bytes(type(self).TOKEN_SIZE_IN_BYTES, byteorder="big") + + self.tokenizer_kwargs = {} if max_length is None else dict( + max_length=max_length - int(add_eos_token), + truncation=True + ) + + def encode(self, text: str) -> bytes: + # tokenize text and convert the token ids to bytes + tokens = [ + t.to_bytes(type(self).TOKEN_SIZE_IN_BYTES, byteorder="big") + for t in self.tokenizer(text, **self.tokenizer_kwargs)["input_ids"] + ] + # + if len(tokens) == 0: + raise ValueError("Received empty sample") + # add special eos token + if self.add_eos_token: + tokens.append(self.eos_token) + + # join byte strings + return b"".join(tokens) + + @classmethod + def decode(cls, serialized_tokens: bytes) -> str: + return [ + int.from_bytes( + serialized_tokens[i:i+cls.TOKEN_SIZE_IN_BYTES], + byteorder="big" + ) + for i in range(0, len(serialized_tokens), cls.TOKEN_SIZE_IN_BYTES) + ] + + +class PillowImageCodec(Codec[str]): + + def __init__( + self, + save_format: str = "png" + ) -> None: + self._format = save_format + + def encode(self, img_file_path: str) -> bytes: + buf = BytesIO() + # write image to buffer + with Image.open(img_file_path) as img: + img.save(buf, format=self._format) + # retuen buffer content + buf.seek(0) + return buf.read() + + @staticmethod + def decode(serialized_img: bytes) -> str: + return Image.open(BytesIO(serialized_img)) + diff --git a/src/modalities/dataloader/create_packed_data.py b/src/modalities/dataloader/create_packed_data.py index 6e8d4d3c..4b294d1f 100644 --- a/src/modalities/dataloader/create_packed_data.py +++ b/src/modalities/dataloader/create_packed_data.py @@ -1,56 +1,69 @@ import pickle import warnings from pathlib import Path -from typing import IO +from typing import IO, Dict import jq import numpy as np from tqdm import tqdm from transformers import PreTrainedTokenizer +from modalities.dataloader.codecs import Codec from modalities.dataloader.large_file_lines_reader import LargeFileLinesReader class PackedDataGenerator: - # amount of bytes to represent tokens as integers. - # If the vocabulary exceeds 2^(8*`size_in_bytes`), this requires adaptation. - TOKEN_SIZE_IN_BYTES = 4 + """ + + Format: HEAD DATA CODECS INDEX + HEAD: DATA_HEAD CODECS_HEAD + """ # amount of bytes to represent number of all tokens in dataset. # If the amount exceeds 2^(8*`header_size_in_bytes`), this requires adaptation. # Decided to keep this constant, since a size of 8 bytes requires more data than the internet currently provides - HEAD_SIZE_IN_BYTES = 8 + DATA_HEAD_SIZE_IN_BYTES = 8 + CODECS_HEAD_SIZE_IN_BYTES = 8 def __init__( self, + codecs: Dict[str, Codec], src_path: Path, - tokenizer: PreTrainedTokenizer, - index_path: Path = None, - jq_pattern: str = ".text", - max_number_of_tokens: int = None, + idx_path: Path = None, + max_num_of_bytes: int = None ): """ Reads in a jsonl file and the corresponding index file and packs dataset file for LLM training. + :param codec: Codec object, which is used to encode the objects into bytes :param src_path: Path to a jsonl file, which holds text data :param index_path: Path to an index file, which indicates the start character position and length of samples given in `src_path`. If not defined, an index file next to `src_path` is picked, by replacing its suffix with ".idx". - :param tokenizer: PretrainedTokenizer object, which is used to pre-tokenize the provided data in `src_path`. - Tokenization is necessary to work on final lengths of token sequences. :param jq_pattern: jq-pattern applied on every jsonl-entry. Results are afterwards tokenized and packed - :param max_number_of_tokens: Limit the total amount of tokens in the packed dataset. - If not specified, the whole data is packed into the dataset. """ + + jq_patterns, codecs = zip(*codecs.items()) + + self.codecs = codecs + self.jq_filters = [jq.compile(pattern) for pattern in jq_patterns] + self.src_path = src_path - self.tokenizer = tokenizer - self.jq_filter = jq.compile(jq_pattern) - self.max_tokens = max_number_of_tokens + self._reader = LargeFileLinesReader(src_path, index_path=idx_path) + + # keep track of file size + self._total_data_bytes = 0 + self._max_data_bytes = max_num_of_bytes - self._reader = LargeFileLinesReader(src_path, index_path=index_path) - self._total_num_of_tokens = 0 - self._curr_offset = self.HEAD_SIZE_IN_BYTES self._index_list = [] + @property + def _current_offset(self) -> int: + return ( + self._total_data_bytes + + type(self).DATA_HEAD_SIZE_IN_BYTES + + type(self).CODECS_HEAD_SIZE_IN_BYTES + ) + def _default_destination_path(self, destination_path: Path = None) -> Path: if destination_path is None: default_destination_path = Path(self.src_path.parent, f"{self.src_path.stem}.pbin") @@ -62,23 +75,33 @@ def _default_destination_path(self, destination_path: Path = None) -> Path: return Path(destination_path) def run(self, dst_path: Path = None): - assert self._total_num_of_tokens == 0, f"This {self.__name__} was already used and is exhausted. Use another!" + assert self._total_data_bytes == 0, f"This {self.__name__} was already used and is exhausted. Use another!" dst_path = self._default_destination_path(destination_path=dst_path) if dst_path.exists(): raise ValueError(f"file already exists at destination path '{dst_path}'.") - encoded_eos_token = self.tokenizer(self.tokenizer.eos_token)["input_ids"][0] - encoded_eos_token_as_bytes = encoded_eos_token.to_bytes(self.TOKEN_SIZE_IN_BYTES, byteorder="big") with dst_path.open("wb") as f: - # allocate first self.header_size_in_bytes bytes for header (encodes length of data section) - # not possible to prepend header after determining size of data section - f.write((0).to_bytes(self.HEAD_SIZE_IN_BYTES, byteorder="big")) - # write data section (tokens) + # store the type-hints to the codec types + # TODO: get the type hints from the enum in case they + # don't match the class name exactly + codecs_bytes = pickle.dumps( + [type(codec).__name__ for codec in self.codecs] + ) + + # allocate bytes for data header and write codecs header + f.write((0).to_bytes(type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big")) + f.write( + len(codecs_bytes).to_bytes( + type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big" + ) + ) + + # write data section for idx, line in tqdm(enumerate(self._reader)): try: - self._process_line(encoded_eos_token_as_bytes, f, line) + self._process_line(f, line) except ValueError: warnings.warn(f"Encountered empty sample in line {idx} of file {self.src_path}") except StopIteration: @@ -86,36 +109,52 @@ def run(self, dst_path: Path = None): except Exception as exception: warnings.warn(f"could not process line: {exception=}") - # write index + # write codecs and index section to file + f.write(codecs_bytes) f.write(pickle.dumps(self._index_list)) self._update_data_length_in_pre_allocated_header(dst_path) def _update_data_length_in_pre_allocated_header(self, dst_path: Path): - start_of_index_in_bytes = self._index_list[-1][0] + self._index_list[-1][1] - length_of_byte_encoded_data_section = start_of_index_in_bytes - self.HEAD_SIZE_IN_BYTES - header_content = length_of_byte_encoded_data_section.to_bytes(self.HEAD_SIZE_IN_BYTES, byteorder="big") + + header_content = self._total_data_bytes.to_bytes(type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big") header_content = np.frombuffer(header_content, dtype="uint8") # write the header content to the packed dataset file - m = np.memmap(dst_path, mode="r+", offset=0, shape=(self.HEAD_SIZE_IN_BYTES,)) + m = np.memmap( + dst_path, + mode="r+", + offset=0, + shape=(type(self).DATA_HEAD_SIZE_IN_BYTES,) + ) m[:] = header_content[:] - def _process_line(self, eos_token_as_bytes: bytes, f: IO, line: str): - jq_retrieved_text = self.jq_filter.input_text(line).first() - tokens = self.tokenizer(jq_retrieved_text)["input_ids"] - if len(tokens) == 0: - raise ValueError("Received empty sample...") - token_idx = 0 - for token in tokens: - token_as_bytes = token.to_bytes(self.TOKEN_SIZE_IN_BYTES, byteorder="big") - f.write(token_as_bytes) - self._total_num_of_tokens += 1 - if self._total_num_of_tokens == self.max_tokens: - segment_length = (token_idx + 1) * self.TOKEN_SIZE_IN_BYTES - self._index_list.append((self._curr_offset, segment_length)) - raise StopIteration - token_idx += 1 - f.write(eos_token_as_bytes) - segment_length = (token_idx + 1) * self.TOKEN_SIZE_IN_BYTES # segment_length in bytes - self._index_list.append((self._curr_offset, segment_length)) - self._curr_offset += segment_length + def _process_line(self, f: IO, line: str): + + sizes = [None] * len(self.codecs) + + for i, (codec, jq_filter) in enumerate( + zip(self.codecs, self.jq_filters), + ): + + # get object to encode and encode using codec + jq_retrieved_text = jq_filter.input_text(line).first() + bytestring = codec.encode(jq_retrieved_text) + num_bytes = len(bytestring) + + if num_bytes == 0: + raise ValueError("Detected Empty sample") + + # write bytestring to file and update size array + f.write(bytestring) + sizes[i] = num_bytes + + # update index and total number of bytes written + self._index_list.append([self._current_offset] + sizes) + self._total_data_bytes += sum(sizes) + + # exceeds size limit + if ( + (self._max_data_bytes is not None) and + (self._total_data_bytes >= self._max_data_bytes) + ): + raise StopIteration diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index 8e7a4c3b..e8db9f18 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -11,14 +11,26 @@ from tqdm import tqdm from transformers import BatchEncoding, PreTrainedTokenizer -from ..dataloader.large_file_lines_reader import LargeFileLinesReader +from .codecs import FixSizedCodec +from .create_packed_data import PackedDataGenerator +from .large_file_lines_reader import LargeFileLinesReader +class SampleKeysMismatchException(Exception): + pass class Dataset(TorchdataSet): - def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): + def __init__(self, raw_data_path: Path, sample_keys: list[str]): self.raw_data_path = raw_data_path - self.block_size = block_size - self.sample_key = sample_key + self.sample_keys = sample_keys + # must provide a sample key for each codec + if len(self.sample_keys) != self.num_elements_per_item: + raise SampleKeysMismatchException( + "Expected %i sample keys, got %s" % (self.num_elements_per_item, self.sample_keys) + ) + + @property + def num_elements_per_item(self) -> int: + raise NotImplementedError def _check_if_inbounds(self, idx: int): if not 0 <= idx < len(self): @@ -50,7 +62,8 @@ def __init__( TODO: If this setting should support multi-modal features using separately encoded inputs, this needs to get replaced with a list of sample keys! """ - super().__init__(raw_data_path=raw_data_path, block_size=block_size, sample_key=sample_key) + super().__init__(raw_data_path=raw_data_path, sample_key=sample_key) + self.block_size = block_size self.reader = LargeFileLinesReader(self.raw_data_path, index_path=index_path) self.jq_filter = jq.compile(jq_pattern) @@ -70,24 +83,49 @@ def __getitem__(self, idx: int) -> BatchEncoding: class PackedMemMapDatasetBase(Dataset): - INT_SIZE_IN_BYTES = 4 - HEADER_SIZE_IN_BYTES = 8 + + def _read_bytes(self, offset: int, size: int) -> bytes: + return np.memmap( + self.raw_data_path, + mode="r", + offset=offset, + shape=(size,), + ).view(f"S{size}")[0] + + @property + def num_elements_per_item(self) -> int: + return len(self._codec_types) - def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): + def __init__(self, raw_data_path: Path, sample_keys: list[str]): """ Base class for packed memmapped datasets. The underlying dataset file has the structure: - | header | data | index | - The header contains information about the length of the subsequent data sequence. The index contains - the tuple information (start, end) in terms of byte positions. + | data_header | codecs_header | data | codecs | index | + + The data and codecs headers contains information about the length of the data and codecs sequences. + + The codecs sequence contains the codec type hints required to decode the bytes to the expected + data type. Specifically it is an encoded list of codec type names: + + (codec_1, codec_2, ...) + + The index stores byte positions of the dataset items in the following format: + + (offset, size_1, size_2, ...) + + The start and end tuple of the j-th value are computed by: + + (offset + sum_{i int: + # read bytes from file + return int.from_bytes( + self._read_bytes(offset, size), + byteorder="big" + ) + + # read headers + self.data_len = read_header( offset=0, - shape=(self.HEADER_SIZE_IN_BYTES,), - ).view(f"S{self.HEADER_SIZE_IN_BYTES}") - self.data_len = int.from_bytes(self.data_len, byteorder="big") + size=PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES + ) + self.codecs_len = read_header( + offset=PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES, + size=PackedDataGenerator.CODECS_HEAD_SIZE_IN_BYTES + ) + + # compute offsets to index raw data file + self.data_offset = ( + PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES + + PackedDataGenerator.CODECS_HEAD_SIZE_IN_BYTES + ) + self.codecs_offset = self.data_offset + self.data_len + self.index_offset = self.codecs_offset + self.codecs_len + + # read codecs + self._codec_type_hints = self._read_bytes( + offset=self.codecs_offset, + size=self.codecs_len + ) + self._codec_type_hints = pickle.loads(self._codec_type_hints) + # needs to be here to avoid circular import + # TODO: find a better way to avoid the circular import + from ..config.lookup_types import CodecTypes + # resolve codec types + self._codec_types = [ + getattr(CodecTypes, codec_type_hint).value + for codec_type_hint in self._codec_type_hints + ] # get index - self.index_base = np.memmap( - self.raw_data_path, - mode="r", - offset=self.HEADER_SIZE_IN_BYTES + self.data_len, - shape=(self.total_bytes - self.data_len - self.HEADER_SIZE_IN_BYTES,), - ).view(f"S{self.total_bytes-self.data_len-self.HEADER_SIZE_IN_BYTES}") - self.index_base = pickle.loads(self.index_base) + self._index_base = self._read_bytes( + offset=self.index_offset, + size=self.total_bytes - self.index_offset + ) + self._index_base = pickle.loads(self._index_base) + assert all(len(idx) == len(self._codec_types) + 1 for idx in self._index_base) + + # initialize after codec types are defined because + # num_elements_per_item depends on it + super().__init__( + raw_data_path=raw_data_path, sample_keys=sample_keys + ) + + +class PackedMemMapDataset(PackedMemMapDatasetBase): + """Packed Memory Map Dataset""" + + def __len__(self) -> int: + return len(self._index_base) + + def __getitem__(self, idx: int) -> BatchEncoding: + # get index values + self._check_if_inbounds(idx) + idx = self._index_base[idx] + + enc = {} + offset = idx[0] + for key, size, codec_type in zip( + self.sample_keys, idx[1:], self._codec_types + ): + # decode item + bytestring = self._read_bytes(offset, size) + enc[key] = codec_type.decode(bytestring) + # update offset + offset += size + + return BatchEncoding(data=enc) class PackedMemMapDatasetContinuous(PackedMemMapDatasetBase): - def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): + + def __init__(self, raw_data_path: Path, sample_key: str, block_size: int): """ PackedMemMapDatasetContinuous iterates through the data in block_size sized chunks, irrespective of the samples' start and end position, as defined in the index. @@ -130,28 +230,53 @@ def __init__(self, raw_data_path: Path, block_size: int, sample_key: str): Use `modalities create_packed_data` to create one based on a jsonl-file. :param block_size: alias for max sequence length. The amount of tokens the model can handle. :param sample_key: model-specific parameter to indicate where in the BatchEncoding the input_token_ids are. - TODO: If this setting should support multi-modal features using separately encoded inputs, - this needs to get replaced with a list of sample keys! """ - super().__init__(raw_data_path=raw_data_path, block_size=block_size, sample_key=sample_key) + try: + super().__init__(raw_data_path=raw_data_path, sample_keys=[sample_key]) + except SampleKeysMismatchException as e: + raise ValueError( + "Can only read continuously from packed data files of single-element dataset, i.e." + "datasets with a single item per line. The specified dataset has %i elements per item." + % self.num_elements_per_item + ) from e + + # check if codec is supported + if not issubclass(self.codec_type, FixSizedCodec): + raise ValueError( + "Can only read continuously from fix-sized codecs, got %s." + % self.codec_type + ) + self.block_size = block_size # get number of total tokens in file - total_tokens = self.data_len // self.INT_SIZE_IN_BYTES - self._num_samples = total_tokens // self.block_size + total_values = self.data_len // self._num_bytes_per_value + self._num_samples = total_values // self.block_size + + @property + def sample_key(self) -> str: + return self.sample_keys[0] + + @property + def codec_type(self) -> FixSizedCodec: + return self._codec_types[0] + + @property + def _num_bytes_per_value(self) -> int: + return self.codec_type.num_bytes_per_value() def __len__(self) -> int: return self._num_samples def __getitem__(self, idx: int) -> BatchEncoding: self._check_if_inbounds(idx) - tokens_as_byte_strings = np.memmap( - self.raw_data_path, - mode="r", - offset=self.HEADER_SIZE_IN_BYTES + idx * self.INT_SIZE_IN_BYTES * self.block_size, - shape=(self.INT_SIZE_IN_BYTES * self.block_size,), - ).view(f"S{self.INT_SIZE_IN_BYTES}") - tokens = [int.from_bytes(token, byteorder="big") for token in tokens_as_byte_strings] - return BatchEncoding(data={self.sample_key: tokens}) + # read block-sized chunk of bytes + byte_string = self._read_bytes( + offset=self.data_offset + idx * self.block_size * self._num_bytes_per_value, + size=self.block_size * self._num_bytes_per_value + ) + # decode and pack into batch encoding + values = self.codec_type.decode(byte_string) + return BatchEncoding(data={self.sample_key: values}) class PackedMemMapDatasetMegatron(PackedMemMapDatasetBase): From 307733b72b0b88ab89c515f6b7722bb55aaa4660 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Sun, 4 Feb 2024 20:26:48 +0100 Subject: [PATCH 03/16] test: Adapt test to new memory format of MemMapDataset and implement tests for image and multimodal data --- tests/conftest.py | 73 +++++++++++-- tests/dataloader/test_packed_dataset.py | 136 +++++++++++++++++++++--- 2 files changed, 188 insertions(+), 21 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f94133ce..67855191 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,19 @@ import dataclasses import os +import json import pickle from pathlib import Path from unittest.mock import MagicMock import pytest +from PIL import Image +import numpy as np import torch from torch.optim import Optimizer from torch.utils.data.sampler import BatchSampler, SequentialSampler from transformers import GPT2TokenizerFast + from modalities.__main__ import load_app_config_dict from modalities.checkpointing.checkpointing import CheckpointingIF from modalities.config.config import AppConfig @@ -26,16 +30,33 @@ _ROOT_DIR = Path(__file__).parents[1] +@dataclasses.dataclass +class DataPathCollection: + raw_data_path: Path + index_path: Path + + @pytest.fixture def dummy_packed_data_path(tmpdir) -> Path: data = b"" - header_size_in_bytes = 8 + data_header_size_in_bytes = 8 + codecs_header_size_in_bytes = 8 int_size_in_bytes = 4 + # data and codecs tokens = list(range(20)) - data += (len(tokens) * int_size_in_bytes).to_bytes(header_size_in_bytes, byteorder="big") + codecs_bytes = pickle.dumps(["HfTokenizerCodec"]) + # headers + data += ( + len(tokens) * int_size_in_bytes + ).to_bytes(data_header_size_in_bytes, byteorder="big") + data += len(codecs_bytes).to_bytes(codecs_header_size_in_bytes, byteorder="big") + # data and codecs data += b"".join([t.to_bytes(int_size_in_bytes, byteorder="big") for t in tokens]) - index = [(4, 24), (28, 40), (68, 12), (80, 4)] # [(index,len), ...] -> in 4 bytes #lengths: 6,10,3,1 + data += codecs_bytes + # index + index = [(16, 24), (40, 28), (68, 12), (80, 16)] # [(index,len), ...] -> in 4 bytes #lengths: 6,10,3,1 data += pickle.dumps(index) + # write to file dummy_packed_data_path = Path(tmpdir, "dummy.pbin") dummy_packed_data_path.write_bytes(data) return dummy_packed_data_path @@ -52,12 +73,6 @@ def dummy_config(monkeypatch) -> AppConfig: return app_config -@dataclasses.dataclass -class DataPathCollection: - raw_data_path: Path - index_path: Path - - @pytest.fixture def dummy_data_path(tmpdir) -> DataPathCollection: source_raw_dummy_data_path = _ROOT_DIR / Path("./data/lorem_ipsum.jsonl") @@ -68,6 +83,46 @@ def dummy_data_path(tmpdir) -> DataPathCollection: return DataPathCollection(raw_data_path=dummy_data_path, index_path=index_path) +@pytest.fixture +def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: + + base_path = Path(tmpdir, "image_data") + img_base_path = Path(base_path, "images") + + base_path.mkdir(parents=True, exist_ok=True) + img_base_path.mkdir(parents=True, exist_ok=True) + + data_path = Path(base_path, "data.jsonl") + index_path = Path(base_path, "data.idx") + img_paths = [ + Path(img_base_path, "img_%i.png" % i) + for i in range(15) + ] + # create random images and save them into the temp directory + for img_path in img_paths: + im = np.random.rand(100, 100, 3) * 255 + im = Image.fromarray(im.astype("uint8")).convert("RGB") + im.save(img_path, "PNG") + # create the jsonl file + with data_path.open("w+") as f: + for img_path in img_paths: + f.write( + json.dumps( + { + "img_path": img_path.absolute().as_posix(), + "text": ( + "This item refers to the image stored at %s" + % str(img_path) + ) + } + ) + "\n" + ) + # create the index file to the jsonl file + IndexGenerator(data_path).create_index(index_path) + + return DataPathCollection(raw_data_path=data_path, index_path=index_path) + + @pytest.fixture def indexed_dummy_data_path(dummy_data_path) -> DataPathCollection: index_generator = IndexGenerator(dummy_data_path.raw_data_path) diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index 64df0e9c..a75f664d 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -1,19 +1,39 @@ +import json import pytest +from PIL import Image +import numpy.testing + +from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec from modalities.dataloader.create_packed_data import PackedDataGenerator -from modalities.dataloader.dataset import PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron +from modalities.dataloader.dataset import PackedMemMapDataset, PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron +@pytest.mark.skip(reason="New packed data format not implemented for megatron dataset") @pytest.mark.parametrize("block_size, expected_length", [(1, 4), (2, 3), (3, 3), (10, 2), (6, 2), (20, 1), (25, 0)]) def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, expected_length): ds = PackedMemMapDatasetMegatron(dummy_packed_data_path, block_size, sample_key="input_ids") assert len(ds) == expected_length +def test_packed_dataset_loading(dummy_packed_data_path): + + ds = PackedMemMapDataset( + dummy_packed_data_path, + sample_keys=["input_ids"] + ) + + assert len(ds) == 4 + assert ds[0]["input_ids"] == [0, 1, 2, 3, 4, 5] + assert ds[1]["input_ids"] == [6, 7, 8, 9, 10, 11, 12] + assert ds[2]["input_ids"] == [13, 14, 15] + assert ds[3]["input_ids"] == [16, 17, 18, 19] + + @pytest.mark.parametrize( "block_size, expected_length, expected_output", [ - (1, 20, [[i] for i in range(20)]), + #(1, 20, [[i] for i in range(20)]), # TODO (2, 10, [[2 * i, 2 * i + 1] for i in range(10)]), (3, 6, [[3 * i, 3 * i + 1, 3 * i + 2] for i in range(6)]), (10, 2, [list(range(10)), list(range(10, 20))]), @@ -22,10 +42,19 @@ def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, exp (25, 0, []), ], ) -def test_packed_continuous_dataset_loading(dummy_packed_data_path, block_size, expected_length, expected_output): - ds = PackedMemMapDatasetContinuous(dummy_packed_data_path, block_size, sample_key="input_ids") +def test_packed_continuous_dataset_loading( + dummy_packed_data_path, block_size, expected_length, expected_output +): + ds = PackedMemMapDatasetContinuous( + dummy_packed_data_path, + sample_key="input_ids", + block_size=block_size + ) assert len(ds) == expected_length - retrieved_input_ids = [list(packed_samples["input_ids"]) for packed_samples in ds] + retrieved_input_ids = [ + list(packed_samples["input_ids"]) + for packed_samples in ds + ] assert retrieved_input_ids == expected_output @@ -35,17 +64,35 @@ def test_packed_continuous_dataset_missing_file(dummy_packed_data_path): PackedMemMapDatasetContinuous(dummy_packed_data_path, block_size=10, sample_key="input_ids") -@pytest.mark.parametrize("max_num_of_tokens, expected_index_size", [(None, 12), (10, 1)]) -def test_create_packed_dataset(indexed_dummy_data_path, gpt2_tokenizer, max_num_of_tokens, expected_index_size): +@pytest.mark.parametrize( + "max_num_of_tokens, expected_index_size", [(None, 12), (10, 1)] +) +def test_create_packed_dataset( + indexed_dummy_data_path, + gpt2_tokenizer, + max_num_of_tokens, + expected_index_size +): block_size = 5 packed_generator = PackedDataGenerator( - src_path=indexed_dummy_data_path.raw_data_path, tokenizer=gpt2_tokenizer, max_number_of_tokens=max_num_of_tokens + src_path=indexed_dummy_data_path.raw_data_path, + codecs={ + ".text": HfTokenizerCodec( + tokenizer=gpt2_tokenizer, + ) + }, + max_num_of_bytes=( + (HfTokenizerCodec.TOKEN_SIZE_IN_BYTES * max_num_of_tokens) + if max_num_of_tokens is not None else None + ) ) default_packed_dataset_path = packed_generator._default_destination_path() assert not default_packed_dataset_path.is_file() packed_generator.run() packed_dataset = PackedMemMapDatasetContinuous( - default_packed_dataset_path, block_size=block_size, sample_key="input_ids" + default_packed_dataset_path, + sample_key="input_ids", + block_size=block_size, ) start_of_jsonl_content = "0 Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor" @@ -53,8 +100,73 @@ def test_create_packed_dataset(indexed_dummy_data_path, gpt2_tokenizer, max_num_ packed_dataset_iterator = iter(packed_dataset) assert tokenized_start_of_jsonl_content[:block_size] == next(packed_dataset_iterator)["input_ids"] assert tokenized_start_of_jsonl_content[block_size : 2 * block_size] == next(packed_dataset_iterator)["input_ids"] - assert len(packed_dataset.index_base) == expected_index_size + assert len(packed_dataset._index_base) == expected_index_size # check validity of index section in packed dataset - for idx, (offset, entry_length) in enumerate(packed_dataset.index_base[:-1]): - assert offset + entry_length == packed_dataset.index_base[idx + 1][0] + for idx, (offset, entry_length) in enumerate(packed_dataset._index_base[:-1]): + assert offset + entry_length == packed_dataset._index_base[idx + 1][0] + + +def test_packed_image_dataset(indexed_dummy_image_data_path): + # create packed data file + packed_generator = PackedDataGenerator( + src_path=indexed_dummy_image_data_path.raw_data_path, + idx_path=indexed_dummy_image_data_path.index_path, + codecs={ + ".img_path": PillowImageCodec() + } + ) + # get destination path + default_packed_dataset_path = packed_generator._default_destination_path() + assert not default_packed_dataset_path.is_file() + # create packed dataset file + packed_generator.run() + + # read dataset + ds = PackedMemMapDataset( + default_packed_dataset_path, + sample_keys=["img"], + ) + # read the jsonl to get the source image paths + with indexed_dummy_image_data_path.raw_data_path.open("r") as f: + src_data = list(map(json.loads, f.read().strip().split("\n"))) + # compare source image with dataset content + for src, item in zip(src_data, ds): + with Image.open(src["img_path"]) as src_img: + numpy.testing.assert_allclose(src_img, item["img"]) + + +def test_packed_multimodal_dataset( + indexed_dummy_image_data_path, gpt2_tokenizer +): + # create packed data file + packed_generator = PackedDataGenerator( + src_path=indexed_dummy_image_data_path.raw_data_path, + idx_path=indexed_dummy_image_data_path.index_path, + codecs={ + ".img_path": PillowImageCodec(), + ".text": HfTokenizerCodec( + tokenizer=gpt2_tokenizer, + add_eos_token=False + ) + } + ) + # get destination path + default_packed_dataset_path = packed_generator._default_destination_path() + assert not default_packed_dataset_path.is_file() + # create packed dataset file + packed_generator.run() + + # read dataset + ds = PackedMemMapDataset( + default_packed_dataset_path, + sample_keys=["img", "input_ids"], + ) + # read the jsonl to get the source values + with indexed_dummy_image_data_path.raw_data_path.open("r") as f: + src_data = list(map(json.loads, f.read().strip().split("\n"))) + # compare source with dataset content + for src, item in zip(src_data, ds): + with Image.open(src["img_path"]) as src_img: + numpy.testing.assert_allclose(src_img, item["img"]) + assert gpt2_tokenizer(src["text"])["input_ids"] == item["input_ids"] From 1feaaf27674f500ec3afb001b694d26b96a84894 Mon Sep 17 00:00:00 2001 From: Thomas Holz Date: Mon, 5 Feb 2024 13:44:46 +0100 Subject: [PATCH 04/16] fix: formatting --- src/modalities/dataloader/codecs.py | 30 ++++++++++------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index dcf679d3..a6af64ad 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -1,12 +1,13 @@ from abc import ABC, abstractmethod -from typing import TypeVar, Generic, Optional, Dict, Any - from io import BytesIO +from typing import Any, Dict, Generic, Optional, TypeVar + from PIL import Image from transformers import PreTrainedTokenizer T = TypeVar("T") + class Codec(ABC, Generic[T]): @abstractmethod def encode(self, obj: T) -> bytes: @@ -35,16 +36,13 @@ def num_bytes_per_value(cls) -> int: class HfTokenizerCodec(FixSizedCodec[str]): TOKEN_SIZE_IN_BYTES = 4 - + @classmethod def num_bytes_per_value(cls) -> int: return cls.TOKEN_SIZE_IN_BYTES def __init__( - self, - tokenizer: PreTrainedTokenizer, - max_length: Optional[int] = None, - add_eos_token: bool = True + self, tokenizer: PreTrainedTokenizer, max_length: Optional[int] = None, add_eos_token: bool = True ) -> None: # instantiate @@ -56,9 +54,8 @@ def __init__( eos_token = self.tokenizer.convert_tokens_to_ids(self.tokenizer.eos_token) self.eos_token = eos_token.to_bytes(type(self).TOKEN_SIZE_IN_BYTES, byteorder="big") - self.tokenizer_kwargs = {} if max_length is None else dict( - max_length=max_length - int(add_eos_token), - truncation=True + self.tokenizer_kwargs = ( + {} if max_length is None else dict(max_length=max_length - int(add_eos_token), truncation=True) ) def encode(self, text: str) -> bytes: @@ -67,7 +64,7 @@ def encode(self, text: str) -> bytes: t.to_bytes(type(self).TOKEN_SIZE_IN_BYTES, byteorder="big") for t in self.tokenizer(text, **self.tokenizer_kwargs)["input_ids"] ] - # + # if len(tokens) == 0: raise ValueError("Received empty sample") # add special eos token @@ -80,20 +77,14 @@ def encode(self, text: str) -> bytes: @classmethod def decode(cls, serialized_tokens: bytes) -> str: return [ - int.from_bytes( - serialized_tokens[i:i+cls.TOKEN_SIZE_IN_BYTES], - byteorder="big" - ) + int.from_bytes(serialized_tokens[i : i + cls.TOKEN_SIZE_IN_BYTES], byteorder="big") for i in range(0, len(serialized_tokens), cls.TOKEN_SIZE_IN_BYTES) ] class PillowImageCodec(Codec[str]): - def __init__( - self, - save_format: str = "png" - ) -> None: + def __init__(self, save_format: str = "png") -> None: self._format = save_format def encode(self, img_file_path: str) -> bytes: @@ -108,4 +99,3 @@ def encode(self, img_file_path: str) -> bytes: @staticmethod def decode(serialized_img: bytes) -> str: return Image.open(BytesIO(serialized_img)) - From a5ae3895bc5b03a87048363786cb8d9b520522ad Mon Sep 17 00:00:00 2001 From: Thomas Holz Date: Thu, 8 Feb 2024 11:25:52 +0100 Subject: [PATCH 05/16] feat: add audio codec WIP --- config_files/data_config.yaml | 3 + pyproject.toml | 5 +- src/modalities/config/config.py | 6 +- src/modalities/config/lookup_types.py | 4 +- src/modalities/dataloader/codecs.py | 88 +++++++++++++++++++++++++ tests/dataloader/test_packed_dataset.py | 36 ++++++++-- 6 files changed, 134 insertions(+), 8 deletions(-) diff --git a/config_files/data_config.yaml b/config_files/data_config.yaml index f6899076..fbb4ae5e 100644 --- a/config_files/data_config.yaml +++ b/config_files/data_config.yaml @@ -13,3 +13,6 @@ features: type_hint: PillowImageCodec config: save_format: png + - jq_pattern: .feat_path + codec: + type_hint: TorchaudioAudioCodec diff --git a/pyproject.toml b/pyproject.toml index caa789cd..a83eae59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,10 @@ dependencies = [ "jq", "xformers", "class_resolver", - "wandb" + "wandb", + "scipy", + "torchaudio", + "pillow", ] [project.optional-dependencies] diff --git a/src/modalities/config/config.py b/src/modalities/config/config.py index 946f5cc5..6c1281bf 100644 --- a/src/modalities/config/config.py +++ b/src/modalities/config/config.py @@ -62,10 +62,14 @@ class HfTokenizerCodecConfig(BaseModel): class PillowImageCodecConfig(BaseModel): save_format: str = "png" + class TorchaudioAudioCodec(BaseModel): + pass + type_hint: CodecTypes config: Union[ HfTokenizerCodecConfig, - PillowImageCodecConfig + PillowImageCodecConfig, + TorchaudioAudioCodec, ] = Field(union_mode="left_to_right") diff --git a/src/modalities/config/lookup_types.py b/src/modalities/config/lookup_types.py index 04b5535a..a89a8b03 100644 --- a/src/modalities/config/lookup_types.py +++ b/src/modalities/config/lookup_types.py @@ -19,7 +19,8 @@ from modalities.dataloader.codecs import ( HfTokenizerCodec, - PillowImageCodec + PillowImageCodec, + TorchaudioAudioCodec, ) @@ -55,6 +56,7 @@ class TokenizerTypes(LookupEnum): class CodecTypes(LookupEnum): HfTokenizerCodec = HfTokenizerCodec PillowImageCodec = PillowImageCodec + TorchaudioAudioCodec = TorchaudioAudioCodec class DatasetTypes(LookupEnum): diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index a6af64ad..175a1108 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -2,6 +2,9 @@ from io import BytesIO from typing import Any, Dict, Generic, Optional, TypeVar +import numpy as np +import torch +import torchaudio from PIL import Image from transformers import PreTrainedTokenizer @@ -99,3 +102,88 @@ def encode(self, img_file_path: str) -> bytes: @staticmethod def decode(serialized_img: bytes) -> str: return Image.open(BytesIO(serialized_img)) + + +class TorchaudioAudioCodec(Codec[str]): + def __init__( + self, + ) -> None: + self.extract_mel_spec = torchaudio.transforms.MelSpectrogram( + sample_rate=16_000, + n_fft=400, + n_mels=80, + hop_length=160, + ) + self.target_sample_rate = 16_000 + + def extract_log_mel_spectrogram( + self, + audio: torch.Tensor, + ) -> torch.Tensor: + ############################################ + # Feature extraction is quite similar to how it is done + # for Radford, Alec, et al. "Robust speech recognition + # via large-scale weak supervision." 2023 AKA Whisper. + # Their code can be found here: + # https://github.com/openai/whisper/blob/main/whisper/audio.py + # MIT LICENSE: https://github.com/openai/whisper/blob/main/LICENSE + ############################################ + + mel_spec = self.extract_mel_spec(audio) + log_mel_spec = torch.clamp(mel_spec, min=1e-10).log10() + log_mel_spec = torch.maximum(log_mel_spec, log_mel_spec.max() - 8.0) + log_mel_spec = (log_mel_spec + 4.0) / 4.0 + return log_mel_spec.transpose(1, 0) + + def resample( + self, + audio: torch.Tensor, + sample_rate: int, + ) -> torch.Tensor: + resampler = torchaudio.transforms.Resample( + sample_rate, + self.target_sample_rate, + dtype=audio.dtype, + ) + return resampler(audio) + + def encode( + self, + audio_file_path: str, + ) -> bytes: + + audio, sample_rate = torchaudio.load( + audio_file_path, + ) + audio = ( + audio.mean( + dim=0, + ) + if audio.shape[0] == 2 + else audio + ) + + audio = ( + self.resample( + audio, + sample_rate, + ) + if sample_rate != self.target_sample_rate + else audio + ) + + log_mel_spec = self.extract_log_mel_spectrogram( + audio, + ).numpy() + + buf = BytesIO() + np.save(buf, log_mel_spec) + buf.seek(0) + + return buf.read() + + @staticmethod + def decode( + serialized_audio: bytes, + ) -> np.ndarray: + return np.load(BytesIO(serialized_audio)) diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index a75f664d..cf2a45cb 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -2,9 +2,10 @@ import pytest from PIL import Image -import numpy.testing -from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec +import numpy as np +import numpy.testing +from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec, TorchaudioAudioCodec from modalities.dataloader.create_packed_data import PackedDataGenerator from modalities.dataloader.dataset import PackedMemMapDataset, PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron @@ -136,9 +137,34 @@ def test_packed_image_dataset(indexed_dummy_image_data_path): numpy.testing.assert_allclose(src_img, item["img"]) -def test_packed_multimodal_dataset( - indexed_dummy_image_data_path, gpt2_tokenizer -): +def test_packed_audio_dataset(indexed_dummy_audio_data_path): + # create packed data file + packed_generator = PackedDataGenerator( + src_path=indexed_dummy_audio_data_path.raw_data_path, + idx_path=indexed_dummy_audio_data_path.index_path, + codecs={".feat_path": TorchaudioAudioCodec()}, + ) + # get destination path + default_packed_dataset_path = packed_generator._default_destination_path() + assert not default_packed_dataset_path.is_file() + # create packed dataset file + packed_generator.run() + + # read dataset + ds = PackedMemMapDataset( + default_packed_dataset_path, + sample_keys=["feat"], + ) + # read the jsonl to get the source feature paths + with indexed_dummy_audio_data_path.raw_data_path.open("r") as f: + src_data = list(map(json.loads, f.read().strip().split("\n"))) + # compare source features with dataset content + for src, item in zip(src_data, ds, strict=True): + log_mel_spec = np.load(src["feat_path"]) + numpy.testing.assert_allclose(log_mel_spec, item["feat"]) + + +def test_packed_multimodal_dataset(indexed_dummy_image_data_path, gpt2_tokenizer): # create packed data file packed_generator = PackedDataGenerator( src_path=indexed_dummy_image_data_path.raw_data_path, From 0af45400961a7d2493ac7fe0d7fbebb472cbe1a4 Mon Sep 17 00:00:00 2001 From: Thomas Holz Date: Thu, 8 Feb 2024 11:51:05 +0100 Subject: [PATCH 06/16] feat: add missing fixture --- tests/conftest.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 67855191..57bf1434 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -123,6 +123,48 @@ def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: return DataPathCollection(raw_data_path=data_path, index_path=index_path) +@pytest.fixture +def indexed_dummy_audio_data_path( + tmpdir, +) -> DataPathCollection: + base_path = Path(tmpdir, "audio_data") + feature_base_path = Path(base_path, "features") + + base_path.mkdir(parents=True, exist_ok=True) + feature_base_path.mkdir(parents=True, exist_ok=True) + + data_path = Path(base_path, "data.jsonl") + index_path = Path(base_path, "data.idx") + feat_paths = [Path(feature_base_path, "feats_%i.npy" % i) for i in range(15)] + + N_MEL_BINS = 80 + N_TIME_STEPS = 1_600 + # create random spectrograms and save them into the temp directory + for feat_path in feat_paths: + log_mel_spec = np.random.rand(N_TIME_STEPS, N_MEL_BINS) + np.save( + feat_path, + log_mel_spec, + ) + # create the jsonl file + with data_path.open("w+") as f: + for feat_path in feat_paths: + f.write( + json.dumps( + { + "feat_path": feat_path.absolute().as_posix(), + "text": ("This item refers to the spectrogram stored at %s" % + str(feat_path)), + } + ) + + "\n" + ) + # create the index file to the jsonl file + IndexGenerator(data_path).create_index(index_path) + + return DataPathCollection(raw_data_path=data_path, index_path=index_path) + + @pytest.fixture def indexed_dummy_data_path(dummy_data_path) -> DataPathCollection: index_generator = IndexGenerator(dummy_data_path.raw_data_path) From 49d9c1cc5b1ca3abf8eca5f74ebd4a510e593eb6 Mon Sep 17 00:00:00 2001 From: mmaurya Date: Fri, 16 Feb 2024 13:49:35 +0000 Subject: [PATCH 07/16] fix: audio codec --- config_files/data_config.yaml | 2 +- pyproject.toml | 1 + src/modalities/dataloader/codecs.py | 28 +++++----- tests/conftest.py | 55 ++++++++----------- tests/dataloader/test_packed_dataset.py | 70 +++++++++---------------- 5 files changed, 64 insertions(+), 92 deletions(-) diff --git a/config_files/data_config.yaml b/config_files/data_config.yaml index fbb4ae5e..e3cd5caa 100644 --- a/config_files/data_config.yaml +++ b/config_files/data_config.yaml @@ -13,6 +13,6 @@ features: type_hint: PillowImageCodec config: save_format: png - - jq_pattern: .feat_path + - jq_pattern: .audio_path codec: type_hint: TorchaudioAudioCodec diff --git a/pyproject.toml b/pyproject.toml index a83eae59..4feecd51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "scipy", "torchaudio", "pillow", + "ffmpeg" ] [project.optional-dependencies] diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index 175a1108..bff7a9d7 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from io import BytesIO -from typing import Any, Dict, Generic, Optional, TypeVar +from typing import Generic, Optional, TypeVar import numpy as np import torch @@ -37,7 +37,6 @@ def num_bytes_per_value(cls) -> int: class HfTokenizerCodec(FixSizedCodec[str]): - TOKEN_SIZE_IN_BYTES = 4 @classmethod @@ -47,7 +46,6 @@ def num_bytes_per_value(cls) -> int: def __init__( self, tokenizer: PreTrainedTokenizer, max_length: Optional[int] = None, add_eos_token: bool = True ) -> None: - # instantiate self.tokenizer = tokenizer self.add_eos_token = add_eos_token @@ -86,7 +84,6 @@ def decode(cls, serialized_tokens: bytes) -> str: class PillowImageCodec(Codec[str]): - def __init__(self, save_format: str = "png") -> None: self._format = save_format @@ -116,6 +113,19 @@ def __init__( ) self.target_sample_rate = 16_000 + def load_audio( + self, + audio_file_path: str, + ) -> torch.Tensor: + audio, sample_rate = torchaudio.load( + audio_file_path, + ) + + return ( + audio.mean(dim=0), + sample_rate, + ) + def extract_log_mel_spectrogram( self, audio: torch.Tensor, @@ -151,17 +161,9 @@ def encode( self, audio_file_path: str, ) -> bytes: - - audio, sample_rate = torchaudio.load( + audio, sample_rate = self.load_audio( audio_file_path, ) - audio = ( - audio.mean( - dim=0, - ) - if audio.shape[0] == 2 - else audio - ) audio = ( self.resample( diff --git a/tests/conftest.py b/tests/conftest.py index 57bf1434..aa740185 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,19 +1,19 @@ import dataclasses -import os import json +import os import pickle from pathlib import Path from unittest.mock import MagicMock -import pytest -from PIL import Image import numpy as np +import pytest import torch +import torchaudio +from PIL import Image from torch.optim import Optimizer from torch.utils.data.sampler import BatchSampler, SequentialSampler from transformers import GPT2TokenizerFast - from modalities.__main__ import load_app_config_dict from modalities.checkpointing.checkpointing import CheckpointingIF from modalities.config.config import AppConfig @@ -46,9 +46,7 @@ def dummy_packed_data_path(tmpdir) -> Path: tokens = list(range(20)) codecs_bytes = pickle.dumps(["HfTokenizerCodec"]) # headers - data += ( - len(tokens) * int_size_in_bytes - ).to_bytes(data_header_size_in_bytes, byteorder="big") + data += (len(tokens) * int_size_in_bytes).to_bytes(data_header_size_in_bytes, byteorder="big") data += len(codecs_bytes).to_bytes(codecs_header_size_in_bytes, byteorder="big") # data and codecs data += b"".join([t.to_bytes(int_size_in_bytes, byteorder="big") for t in tokens]) @@ -84,8 +82,7 @@ def dummy_data_path(tmpdir) -> DataPathCollection: @pytest.fixture -def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: - +def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: base_path = Path(tmpdir, "image_data") img_base_path = Path(base_path, "images") @@ -94,10 +91,7 @@ def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: data_path = Path(base_path, "data.jsonl") index_path = Path(base_path, "data.idx") - img_paths = [ - Path(img_base_path, "img_%i.png" % i) - for i in range(15) - ] + img_paths = [Path(img_base_path, "img_%i.png" % i) for i in range(15)] # create random images and save them into the temp directory for img_path in img_paths: im = np.random.rand(100, 100, 3) * 255 @@ -110,12 +104,10 @@ def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: json.dumps( { "img_path": img_path.absolute().as_posix(), - "text": ( - "This item refers to the image stored at %s" - % str(img_path) - ) + "text": ("This item refers to the image stored at %s" % str(img_path)), } - ) + "\n" + ) + + "\n" ) # create the index file to the jsonl file IndexGenerator(data_path).create_index(index_path) @@ -128,33 +120,30 @@ def indexed_dummy_audio_data_path( tmpdir, ) -> DataPathCollection: base_path = Path(tmpdir, "audio_data") - feature_base_path = Path(base_path, "features") + audio_base_path = Path(base_path, "audios") base_path.mkdir(parents=True, exist_ok=True) - feature_base_path.mkdir(parents=True, exist_ok=True) + audio_base_path.mkdir(parents=True, exist_ok=True) data_path = Path(base_path, "data.jsonl") index_path = Path(base_path, "data.idx") - feat_paths = [Path(feature_base_path, "feats_%i.npy" % i) for i in range(15)] + audio_paths = [Path(audio_base_path, "audio_%i.wav" % i) for i in range(15)] - N_MEL_BINS = 80 - N_TIME_STEPS = 1_600 + NUM_CHANNELS = 1 + SAMPLING_RATE = 16000 + AUDIO_DUR_SECS = 5 # create random spectrograms and save them into the temp directory - for feat_path in feat_paths: - log_mel_spec = np.random.rand(N_TIME_STEPS, N_MEL_BINS) - np.save( - feat_path, - log_mel_spec, - ) + for audio_path in audio_paths: + audio = torch.randn(NUM_CHANNELS, SAMPLING_RATE * AUDIO_DUR_SECS) + torchaudio.save(audio_path, audio, SAMPLING_RATE) # create the jsonl file with data_path.open("w+") as f: - for feat_path in feat_paths: + for audio_path in audio_paths: f.write( json.dumps( { - "feat_path": feat_path.absolute().as_posix(), - "text": ("This item refers to the spectrogram stored at %s" % - str(feat_path)), + "audio_path": audio_path.absolute().as_posix(), + "text": ("This item refers to the spectrogram stored at %s" % str(audio_path)), } ) + "\n" diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index cf2a45cb..53d93546 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -1,13 +1,16 @@ import json -import pytest +import numpy.testing +import pytest from PIL import Image -import numpy as np -import numpy.testing from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec, TorchaudioAudioCodec from modalities.dataloader.create_packed_data import PackedDataGenerator -from modalities.dataloader.dataset import PackedMemMapDataset, PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron +from modalities.dataloader.dataset import ( + PackedMemMapDataset, + PackedMemMapDatasetContinuous, + PackedMemMapDatasetMegatron, +) @pytest.mark.skip(reason="New packed data format not implemented for megatron dataset") @@ -18,11 +21,7 @@ def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, exp def test_packed_dataset_loading(dummy_packed_data_path): - - ds = PackedMemMapDataset( - dummy_packed_data_path, - sample_keys=["input_ids"] - ) + ds = PackedMemMapDataset(dummy_packed_data_path, sample_keys=["input_ids"]) assert len(ds) == 4 assert ds[0]["input_ids"] == [0, 1, 2, 3, 4, 5] @@ -34,7 +33,7 @@ def test_packed_dataset_loading(dummy_packed_data_path): @pytest.mark.parametrize( "block_size, expected_length, expected_output", [ - #(1, 20, [[i] for i in range(20)]), # TODO + # (1, 20, [[i] for i in range(20)]), # TODO (2, 10, [[2 * i, 2 * i + 1] for i in range(10)]), (3, 6, [[3 * i, 3 * i + 1, 3 * i + 2] for i in range(6)]), (10, 2, [list(range(10)), list(range(10, 20))]), @@ -43,19 +42,10 @@ def test_packed_dataset_loading(dummy_packed_data_path): (25, 0, []), ], ) -def test_packed_continuous_dataset_loading( - dummy_packed_data_path, block_size, expected_length, expected_output -): - ds = PackedMemMapDatasetContinuous( - dummy_packed_data_path, - sample_key="input_ids", - block_size=block_size - ) +def test_packed_continuous_dataset_loading(dummy_packed_data_path, block_size, expected_length, expected_output): + ds = PackedMemMapDatasetContinuous(dummy_packed_data_path, sample_key="input_ids", block_size=block_size) assert len(ds) == expected_length - retrieved_input_ids = [ - list(packed_samples["input_ids"]) - for packed_samples in ds - ] + retrieved_input_ids = [list(packed_samples["input_ids"]) for packed_samples in ds] assert retrieved_input_ids == expected_output @@ -65,15 +55,8 @@ def test_packed_continuous_dataset_missing_file(dummy_packed_data_path): PackedMemMapDatasetContinuous(dummy_packed_data_path, block_size=10, sample_key="input_ids") -@pytest.mark.parametrize( - "max_num_of_tokens, expected_index_size", [(None, 12), (10, 1)] -) -def test_create_packed_dataset( - indexed_dummy_data_path, - gpt2_tokenizer, - max_num_of_tokens, - expected_index_size -): +@pytest.mark.parametrize("max_num_of_tokens, expected_index_size", [(None, 12), (10, 1)]) +def test_create_packed_dataset(indexed_dummy_data_path, gpt2_tokenizer, max_num_of_tokens, expected_index_size): block_size = 5 packed_generator = PackedDataGenerator( src_path=indexed_dummy_data_path.raw_data_path, @@ -83,9 +66,8 @@ def test_create_packed_dataset( ) }, max_num_of_bytes=( - (HfTokenizerCodec.TOKEN_SIZE_IN_BYTES * max_num_of_tokens) - if max_num_of_tokens is not None else None - ) + (HfTokenizerCodec.TOKEN_SIZE_IN_BYTES * max_num_of_tokens) if max_num_of_tokens is not None else None + ), ) default_packed_dataset_path = packed_generator._default_destination_path() assert not default_packed_dataset_path.is_file() @@ -113,9 +95,7 @@ def test_packed_image_dataset(indexed_dummy_image_data_path): packed_generator = PackedDataGenerator( src_path=indexed_dummy_image_data_path.raw_data_path, idx_path=indexed_dummy_image_data_path.index_path, - codecs={ - ".img_path": PillowImageCodec() - } + codecs={".img_path": PillowImageCodec()}, ) # get destination path default_packed_dataset_path = packed_generator._default_destination_path() @@ -142,7 +122,7 @@ def test_packed_audio_dataset(indexed_dummy_audio_data_path): packed_generator = PackedDataGenerator( src_path=indexed_dummy_audio_data_path.raw_data_path, idx_path=indexed_dummy_audio_data_path.index_path, - codecs={".feat_path": TorchaudioAudioCodec()}, + codecs={".audio_path": TorchaudioAudioCodec()}, ) # get destination path default_packed_dataset_path = packed_generator._default_destination_path() @@ -158,9 +138,12 @@ def test_packed_audio_dataset(indexed_dummy_audio_data_path): # read the jsonl to get the source feature paths with indexed_dummy_audio_data_path.raw_data_path.open("r") as f: src_data = list(map(json.loads, f.read().strip().split("\n"))) - # compare source features with dataset content + # compare source features with dataset content + codec = TorchaudioAudioCodec() for src, item in zip(src_data, ds, strict=True): - log_mel_spec = np.load(src["feat_path"]) + audio, sample_rate = codec.load_audio(src["audio_path"]) + audio = codec.resample(audio, sample_rate) + log_mel_spec = codec.extract_log_mel_spectrogram(audio) numpy.testing.assert_allclose(log_mel_spec, item["feat"]) @@ -171,11 +154,8 @@ def test_packed_multimodal_dataset(indexed_dummy_image_data_path, gpt2_tokenizer idx_path=indexed_dummy_image_data_path.index_path, codecs={ ".img_path": PillowImageCodec(), - ".text": HfTokenizerCodec( - tokenizer=gpt2_tokenizer, - add_eos_token=False - ) - } + ".text": HfTokenizerCodec(tokenizer=gpt2_tokenizer, add_eos_token=False), + }, ) # get destination path default_packed_dataset_path = packed_generator._default_destination_path() From 220493ad12a2e02add3c6c126ef4ca430fb07337 Mon Sep 17 00:00:00 2001 From: Thomas Holz Date: Mon, 19 Feb 2024 11:19:03 +0100 Subject: [PATCH 08/16] refactor: clean-up test fixtures --- tests/conftest.py | 57 ++++++++----------------- tests/dataloader/test_packed_dataset.py | 40 ++++++++++------- 2 files changed, 41 insertions(+), 56 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index aa740185..46e4f88f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,11 +9,6 @@ import pytest import torch import torchaudio -from PIL import Image -from torch.optim import Optimizer -from torch.utils.data.sampler import BatchSampler, SequentialSampler -from transformers import GPT2TokenizerFast - from modalities.__main__ import load_app_config_dict from modalities.checkpointing.checkpointing import CheckpointingIF from modalities.config.config import AppConfig @@ -26,6 +21,10 @@ from modalities.loss_functions import Loss from modalities.models.model import NNModel from modalities.trainer import Trainer +from PIL import Image +from torch.optim import Optimizer +from torch.utils.data.sampler import BatchSampler, SequentialSampler +from transformers import GPT2TokenizerFast _ROOT_DIR = Path(__file__).parents[1] @@ -82,68 +81,46 @@ def dummy_data_path(tmpdir) -> DataPathCollection: @pytest.fixture -def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: +def indexed_multimodal_dummy_data_path(tmpdir) -> DataPathCollection: base_path = Path(tmpdir, "image_data") img_base_path = Path(base_path, "images") + audio_base_path = Path(base_path, "audios") base_path.mkdir(parents=True, exist_ok=True) img_base_path.mkdir(parents=True, exist_ok=True) + audio_base_path.mkdir(parents=True, exist_ok=True) data_path = Path(base_path, "data.jsonl") index_path = Path(base_path, "data.idx") img_paths = [Path(img_base_path, "img_%i.png" % i) for i in range(15)] + audio_paths = [Path(audio_base_path, "audio_%i.wav" % i) for i in range(15)] + # create random images and save them into the temp directory for img_path in img_paths: im = np.random.rand(100, 100, 3) * 255 im = Image.fromarray(im.astype("uint8")).convert("RGB") im.save(img_path, "PNG") - # create the jsonl file - with data_path.open("w+") as f: - for img_path in img_paths: - f.write( - json.dumps( - { - "img_path": img_path.absolute().as_posix(), - "text": ("This item refers to the image stored at %s" % str(img_path)), - } - ) - + "\n" - ) - # create the index file to the jsonl file - IndexGenerator(data_path).create_index(index_path) - - return DataPathCollection(raw_data_path=data_path, index_path=index_path) - - -@pytest.fixture -def indexed_dummy_audio_data_path( - tmpdir, -) -> DataPathCollection: - base_path = Path(tmpdir, "audio_data") - audio_base_path = Path(base_path, "audios") - - base_path.mkdir(parents=True, exist_ok=True) - audio_base_path.mkdir(parents=True, exist_ok=True) - - data_path = Path(base_path, "data.jsonl") - index_path = Path(base_path, "data.idx") - audio_paths = [Path(audio_base_path, "audio_%i.wav" % i) for i in range(15)] + # create random spectrograms and save them into the temp directory NUM_CHANNELS = 1 SAMPLING_RATE = 16000 AUDIO_DUR_SECS = 5 - # create random spectrograms and save them into the temp directory + for audio_path in audio_paths: audio = torch.randn(NUM_CHANNELS, SAMPLING_RATE * AUDIO_DUR_SECS) torchaudio.save(audio_path, audio, SAMPLING_RATE) + # create the jsonl file with data_path.open("w+") as f: - for audio_path in audio_paths: + for img_path in img_paths: f.write( json.dumps( { + "img_path": img_path.absolute().as_posix(), "audio_path": audio_path.absolute().as_posix(), - "text": ("This item refers to the spectrogram stored at %s" % str(audio_path)), + "text": ( + f"This item refers to the image stored at {str(img_path)} and the spectrogram stored at {str(audio_path)}" + ), } ) + "\n" diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index 53d93546..f98d4dbd 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -2,8 +2,6 @@ import numpy.testing import pytest -from PIL import Image - from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec, TorchaudioAudioCodec from modalities.dataloader.create_packed_data import PackedDataGenerator from modalities.dataloader.dataset import ( @@ -11,6 +9,7 @@ PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron, ) +from PIL import Image @pytest.mark.skip(reason="New packed data format not implemented for megatron dataset") @@ -90,11 +89,11 @@ def test_create_packed_dataset(indexed_dummy_data_path, gpt2_tokenizer, max_num_ assert offset + entry_length == packed_dataset._index_base[idx + 1][0] -def test_packed_image_dataset(indexed_dummy_image_data_path): +def test_packed_image_dataset(indexed_multimodal_dummy_data_path): # create packed data file packed_generator = PackedDataGenerator( - src_path=indexed_dummy_image_data_path.raw_data_path, - idx_path=indexed_dummy_image_data_path.index_path, + src_path=indexed_multimodal_dummy_data_path.raw_data_path, + idx_path=indexed_multimodal_dummy_data_path.index_path, codecs={".img_path": PillowImageCodec()}, ) # get destination path @@ -109,7 +108,7 @@ def test_packed_image_dataset(indexed_dummy_image_data_path): sample_keys=["img"], ) # read the jsonl to get the source image paths - with indexed_dummy_image_data_path.raw_data_path.open("r") as f: + with indexed_multimodal_dummy_data_path.raw_data_path.open("r") as f: src_data = list(map(json.loads, f.read().strip().split("\n"))) # compare source image with dataset content for src, item in zip(src_data, ds): @@ -117,11 +116,11 @@ def test_packed_image_dataset(indexed_dummy_image_data_path): numpy.testing.assert_allclose(src_img, item["img"]) -def test_packed_audio_dataset(indexed_dummy_audio_data_path): +def test_packed_audio_dataset(indexed_multimodal_dummy_data_path): # create packed data file packed_generator = PackedDataGenerator( - src_path=indexed_dummy_audio_data_path.raw_data_path, - idx_path=indexed_dummy_audio_data_path.index_path, + src_path=indexed_multimodal_dummy_data_path.raw_data_path, + idx_path=indexed_multimodal_dummy_data_path.index_path, codecs={".audio_path": TorchaudioAudioCodec()}, ) # get destination path @@ -136,9 +135,10 @@ def test_packed_audio_dataset(indexed_dummy_audio_data_path): sample_keys=["feat"], ) # read the jsonl to get the source feature paths - with indexed_dummy_audio_data_path.raw_data_path.open("r") as f: + with indexed_multimodal_dummy_data_path.raw_data_path.open("r") as f: src_data = list(map(json.loads, f.read().strip().split("\n"))) - # compare source features with dataset content + + # compare source features with dataset content codec = TorchaudioAudioCodec() for src, item in zip(src_data, ds, strict=True): audio, sample_rate = codec.load_audio(src["audio_path"]) @@ -147,14 +147,15 @@ def test_packed_audio_dataset(indexed_dummy_audio_data_path): numpy.testing.assert_allclose(log_mel_spec, item["feat"]) -def test_packed_multimodal_dataset(indexed_dummy_image_data_path, gpt2_tokenizer): +def test_packed_multimodal_dataset(indexed_multimodal_dummy_data_path, gpt2_tokenizer): # create packed data file packed_generator = PackedDataGenerator( - src_path=indexed_dummy_image_data_path.raw_data_path, - idx_path=indexed_dummy_image_data_path.index_path, + src_path=indexed_multimodal_dummy_data_path.raw_data_path, + idx_path=indexed_multimodal_dummy_data_path.index_path, codecs={ ".img_path": PillowImageCodec(), ".text": HfTokenizerCodec(tokenizer=gpt2_tokenizer, add_eos_token=False), + ".audio_path": TorchaudioAudioCodec(), }, ) # get destination path @@ -166,13 +167,20 @@ def test_packed_multimodal_dataset(indexed_dummy_image_data_path, gpt2_tokenizer # read dataset ds = PackedMemMapDataset( default_packed_dataset_path, - sample_keys=["img", "input_ids"], + sample_keys=["img", "input_ids", "audio_feat"], ) + audio_codec = TorchaudioAudioCodec() + # read the jsonl to get the source values - with indexed_dummy_image_data_path.raw_data_path.open("r") as f: + with indexed_multimodal_dummy_data_path.raw_data_path.open("r") as f: src_data = list(map(json.loads, f.read().strip().split("\n"))) # compare source with dataset content for src, item in zip(src_data, ds): with Image.open(src["img_path"]) as src_img: numpy.testing.assert_allclose(src_img, item["img"]) assert gpt2_tokenizer(src["text"])["input_ids"] == item["input_ids"] + + audio, sample_rate = audio_codec.load_audio(src["audio_path"]) + audio = audio_codec.resample(audio, sample_rate) + log_mel_spec = audio_codec.extract_log_mel_spectrogram(audio) + numpy.testing.assert_allclose(log_mel_spec, item["audio_feat"]) From 7191ba11c79a1274a355625449ed9f8ca44574d7 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Mon, 19 Feb 2024 13:57:46 +0100 Subject: [PATCH 09/16] refactor: run pre-commit hooks --- src/modalities/__main__.py | 8 +-- src/modalities/batch.py | 5 +- src/modalities/config/config.py | 12 ++-- src/modalities/config/lookup_types.py | 6 +- src/modalities/dataloader/codecs.py | 33 +++------- .../dataloader/create_packed_data.py | 48 +++----------- src/modalities/dataloader/dataset.py | 65 ++++++------------- src/modalities/exceptions.py | 2 +- .../logging_broker/message_broker.py | 5 +- src/modalities/logging_broker/publisher.py | 3 +- src/modalities/logging_broker/subscriber.py | 2 +- .../subscriber_impl/results_subscriber.py | 20 ++++-- .../models/gpt2/preprocess_dataset.py | 18 +++-- src/modalities/models/model.py | 4 +- src/modalities/resolver_register.py | 60 +++++------------ src/modalities/test.py | 3 +- tests/conftest.py | 27 +++----- tests/dataloader/test_packed_dataset.py | 64 ++++++------------ 18 files changed, 135 insertions(+), 250 deletions(-) diff --git a/src/modalities/__main__.py b/src/modalities/__main__.py index c90e1568..5ebc4eb2 100644 --- a/src/modalities/__main__.py +++ b/src/modalities/__main__.py @@ -15,7 +15,7 @@ from modalities.batch import EvaluationResultBatch from modalities.checkpointing.checkpointing import Checkpointing, CheckpointingIF from modalities.checkpointing.checkpointing_factory import CheckpointingFactory -from modalities.config.config import AppConfig, ModalitiesSetupConfig, RunMode, PreparationAppConfig +from modalities.config.config import AppConfig, ModalitiesSetupConfig, PreparationAppConfig, RunMode from modalities.config.lookup_types import TokenizerTypes from modalities.dataloader.create_index import IndexGenerator from modalities.dataloader.create_packed_data import PackedDataGenerator @@ -118,15 +118,11 @@ def entry_point_create_memmap_index(src_path, index_path): help="input path for index. will search in parent directory of src_path if none.", ) def entry_point_create_packed_data(src_path, config_file_path, dst_path, idx_path): - config_dict = load_app_config_dict(config_file_path) config = PreparationAppConfig.model_validate(config_dict) # build codec components resolvers = ResolverRegister() - codecs = { - f.jq_pattern: resolvers.build_component_by_config(f.codec) - for f in config.features - } + codecs = {f.jq_pattern: resolvers.build_component_by_config(f.codec) for f in config.features} # generate packed data generator = PackedDataGenerator( codecs, diff --git a/src/modalities/batch.py b/src/modalities/batch.py index bc6c62c0..7cf3f34e 100644 --- a/src/modalities/batch.py +++ b/src/modalities/batch.py @@ -103,12 +103,15 @@ class EvaluationResultBatch(Batch): losses: Dict[str, torch.Tensor] = field(default_factory=lambda: dict()) metrics: Dict[str, torch.Tensor] = field(default_factory=lambda: dict()) throughput_metrics: Dict[str, torch.Tensor] = field(default_factory=lambda: dict()) + def __str__(self) -> str: eval_str = ( f"Evaluation result on dataset tag {self.dataloader_tag} after {self.global_train_sample_id + 1} samples:" ) eval_str += "\n\nlosses: " + "\n\t".join([f"{k}: {v.mean().item()}" for k, v in self.losses.items()]) eval_str += "\n\nmetrics: " + "\n\t".join([f"{k}: {v.mean().item()}" for k, v in self.metrics.items()]) - eval_str += "\n\nthroughput metrics: " + "\n\t".join([f"{k}: {v.mean().item()}" for k, v in self.throughput_metrics.items()]) + eval_str += "\n\nthroughput metrics: " + "\n\t".join( + [f"{k}: {v.mean().item()}" for k, v in self.throughput_metrics.items()] + ) eval_str += "\n===============================================" return eval_str diff --git a/src/modalities/config/config.py b/src/modalities/config/config.py index 946f5cc5..543e47f4 100644 --- a/src/modalities/config/config.py +++ b/src/modalities/config/config.py @@ -11,6 +11,7 @@ BatchSamplerTypes, CheckpointingExectionTypes, CheckpointingStrategyTypes, + CodecTypes, CollatorTypes, DataloaderTypes, DatasetTypes, @@ -20,7 +21,6 @@ SamplerTypes, SchedulerTypes, TokenizerTypes, - CodecTypes ) from modalities.config.types import ProcessGroupBackendType from modalities.models.gpt2.gpt2_model import GPT2Config @@ -53,7 +53,6 @@ class GPT2TokenizerFastConfig(BaseModel): class CodecConfig(BaseModel): - class HfTokenizerCodecConfig(BaseModel): tokenizer: TokenizerConfig max_length: Optional[int] = None @@ -63,19 +62,15 @@ class PillowImageCodecConfig(BaseModel): save_format: str = "png" type_hint: CodecTypes - config: Union[ - HfTokenizerCodecConfig, - PillowImageCodecConfig - ] = Field(union_mode="left_to_right") + config: Union[HfTokenizerCodecConfig, PillowImageCodecConfig] = Field(union_mode="left_to_right") class FeatureConfig(BaseModel): - codec: CodecConfig jq_pattern: str -class PreparationAppConfig(BaseModel): +class PreparationAppConfig(BaseModel): features: List[FeatureConfig] @@ -312,6 +307,7 @@ class RunMode(Enum): FROM_SCRATCH = "FROM_SCRATCH" WARM_START = "WARM_START" + class ModalitiesSetupConfig(BaseModel): class WarmStartSettings(BaseModel): checkpoint_model_path: Path diff --git a/src/modalities/config/lookup_types.py b/src/modalities/config/lookup_types.py index 04b5535a..faa4910a 100644 --- a/src/modalities/config/lookup_types.py +++ b/src/modalities/config/lookup_types.py @@ -9,6 +9,7 @@ SaveEveryKStepsCheckpointingStrategy, SaveKMostRecentCheckpointsStrategy, ) +from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec from modalities.dataloader.dataloader import LLMDataLoader, RepeatingDataLoader from modalities.dataloader.dataset import MemMapDataset, PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron from modalities.dataloader.open_gptx_dataset.mmap_dataset import MMapIndexedDatasetBuilder @@ -17,11 +18,6 @@ from modalities.models.gpt2.collator import GPT2LLMCollator from modalities.models.gpt2.gpt2_model import GPT2LLM -from modalities.dataloader.codecs import ( - HfTokenizerCodec, - PillowImageCodec -) - class LookupEnum(Enum): @classmethod diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index dcf679d3..6a204cd4 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -1,12 +1,13 @@ from abc import ABC, abstractmethod -from typing import TypeVar, Generic, Optional, Dict, Any - from io import BytesIO +from typing import Generic, Optional, TypeVar + from PIL import Image from transformers import PreTrainedTokenizer T = TypeVar("T") + class Codec(ABC, Generic[T]): @abstractmethod def encode(self, obj: T) -> bytes: @@ -33,20 +34,15 @@ def num_bytes_per_value(cls) -> int: class HfTokenizerCodec(FixSizedCodec[str]): - TOKEN_SIZE_IN_BYTES = 4 - + @classmethod def num_bytes_per_value(cls) -> int: return cls.TOKEN_SIZE_IN_BYTES def __init__( - self, - tokenizer: PreTrainedTokenizer, - max_length: Optional[int] = None, - add_eos_token: bool = True + self, tokenizer: PreTrainedTokenizer, max_length: Optional[int] = None, add_eos_token: bool = True ) -> None: - # instantiate self.tokenizer = tokenizer self.add_eos_token = add_eos_token @@ -56,9 +52,8 @@ def __init__( eos_token = self.tokenizer.convert_tokens_to_ids(self.tokenizer.eos_token) self.eos_token = eos_token.to_bytes(type(self).TOKEN_SIZE_IN_BYTES, byteorder="big") - self.tokenizer_kwargs = {} if max_length is None else dict( - max_length=max_length - int(add_eos_token), - truncation=True + self.tokenizer_kwargs = ( + {} if max_length is None else dict(max_length=max_length - int(add_eos_token), truncation=True) ) def encode(self, text: str) -> bytes: @@ -67,7 +62,7 @@ def encode(self, text: str) -> bytes: t.to_bytes(type(self).TOKEN_SIZE_IN_BYTES, byteorder="big") for t in self.tokenizer(text, **self.tokenizer_kwargs)["input_ids"] ] - # + # if len(tokens) == 0: raise ValueError("Received empty sample") # add special eos token @@ -80,20 +75,13 @@ def encode(self, text: str) -> bytes: @classmethod def decode(cls, serialized_tokens: bytes) -> str: return [ - int.from_bytes( - serialized_tokens[i:i+cls.TOKEN_SIZE_IN_BYTES], - byteorder="big" - ) + int.from_bytes(serialized_tokens[i : i + cls.TOKEN_SIZE_IN_BYTES], byteorder="big") for i in range(0, len(serialized_tokens), cls.TOKEN_SIZE_IN_BYTES) ] class PillowImageCodec(Codec[str]): - - def __init__( - self, - save_format: str = "png" - ) -> None: + def __init__(self, save_format: str = "png") -> None: self._format = save_format def encode(self, img_file_path: str) -> bytes: @@ -108,4 +96,3 @@ def encode(self, img_file_path: str) -> bytes: @staticmethod def decode(serialized_img: bytes) -> str: return Image.open(BytesIO(serialized_img)) - diff --git a/src/modalities/dataloader/create_packed_data.py b/src/modalities/dataloader/create_packed_data.py index 4b294d1f..c7913350 100644 --- a/src/modalities/dataloader/create_packed_data.py +++ b/src/modalities/dataloader/create_packed_data.py @@ -6,7 +6,6 @@ import jq import numpy as np from tqdm import tqdm -from transformers import PreTrainedTokenizer from modalities.dataloader.codecs import Codec from modalities.dataloader.large_file_lines_reader import LargeFileLinesReader @@ -18,19 +17,14 @@ class PackedDataGenerator: Format: HEAD DATA CODECS INDEX HEAD: DATA_HEAD CODECS_HEAD """ + # amount of bytes to represent number of all tokens in dataset. # If the amount exceeds 2^(8*`header_size_in_bytes`), this requires adaptation. # Decided to keep this constant, since a size of 8 bytes requires more data than the internet currently provides DATA_HEAD_SIZE_IN_BYTES = 8 CODECS_HEAD_SIZE_IN_BYTES = 8 - def __init__( - self, - codecs: Dict[str, Codec], - src_path: Path, - idx_path: Path = None, - max_num_of_bytes: int = None - ): + def __init__(self, codecs: Dict[str, Codec], src_path: Path, idx_path: Path = None, max_num_of_bytes: int = None): """ Reads in a jsonl file and the corresponding index file and packs dataset file for LLM training. :param codec: Codec object, which is used to encode the objects into bytes @@ -49,7 +43,7 @@ def __init__( self.src_path = src_path self._reader = LargeFileLinesReader(src_path, index_path=idx_path) - + # keep track of file size self._total_data_bytes = 0 self._max_data_bytes = max_num_of_bytes @@ -58,11 +52,7 @@ def __init__( @property def _current_offset(self) -> int: - return ( - self._total_data_bytes - + type(self).DATA_HEAD_SIZE_IN_BYTES - + type(self).CODECS_HEAD_SIZE_IN_BYTES - ) + return self._total_data_bytes + type(self).DATA_HEAD_SIZE_IN_BYTES + type(self).CODECS_HEAD_SIZE_IN_BYTES def _default_destination_path(self, destination_path: Path = None) -> Path: if destination_path is None: @@ -82,21 +72,14 @@ def run(self, dst_path: Path = None): raise ValueError(f"file already exists at destination path '{dst_path}'.") with dst_path.open("wb") as f: - # store the type-hints to the codec types # TODO: get the type hints from the enum in case they # don't match the class name exactly - codecs_bytes = pickle.dumps( - [type(codec).__name__ for codec in self.codecs] - ) + codecs_bytes = pickle.dumps([type(codec).__name__ for codec in self.codecs]) # allocate bytes for data header and write codecs header f.write((0).to_bytes(type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big")) - f.write( - len(codecs_bytes).to_bytes( - type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big" - ) - ) + f.write(len(codecs_bytes).to_bytes(type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big")) # write data section for idx, line in tqdm(enumerate(self._reader)): @@ -116,30 +99,22 @@ def run(self, dst_path: Path = None): self._update_data_length_in_pre_allocated_header(dst_path) def _update_data_length_in_pre_allocated_header(self, dst_path: Path): - header_content = self._total_data_bytes.to_bytes(type(self).DATA_HEAD_SIZE_IN_BYTES, byteorder="big") header_content = np.frombuffer(header_content, dtype="uint8") # write the header content to the packed dataset file - m = np.memmap( - dst_path, - mode="r+", - offset=0, - shape=(type(self).DATA_HEAD_SIZE_IN_BYTES,) - ) + m = np.memmap(dst_path, mode="r+", offset=0, shape=(type(self).DATA_HEAD_SIZE_IN_BYTES,)) m[:] = header_content[:] def _process_line(self, f: IO, line: str): - sizes = [None] * len(self.codecs) for i, (codec, jq_filter) in enumerate( zip(self.codecs, self.jq_filters), ): - # get object to encode and encode using codec jq_retrieved_text = jq_filter.input_text(line).first() bytestring = codec.encode(jq_retrieved_text) - num_bytes = len(bytestring) + num_bytes = len(bytestring) if num_bytes == 0: raise ValueError("Detected Empty sample") @@ -147,14 +122,11 @@ def _process_line(self, f: IO, line: str): # write bytestring to file and update size array f.write(bytestring) sizes[i] = num_bytes - + # update index and total number of bytes written self._index_list.append([self._current_offset] + sizes) self._total_data_bytes += sum(sizes) # exceeds size limit - if ( - (self._max_data_bytes is not None) and - (self._total_data_bytes >= self._max_data_bytes) - ): + if (self._max_data_bytes is not None) and (self._total_data_bytes >= self._max_data_bytes): raise StopIteration diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index e8db9f18..55ac80c1 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -15,9 +15,11 @@ from .create_packed_data import PackedDataGenerator from .large_file_lines_reader import LargeFileLinesReader + class SampleKeysMismatchException(Exception): pass + class Dataset(TorchdataSet): def __init__(self, raw_data_path: Path, sample_keys: list[str]): self.raw_data_path = raw_data_path @@ -83,15 +85,16 @@ def __getitem__(self, idx: int) -> BatchEncoding: class PackedMemMapDatasetBase(Dataset): - def _read_bytes(self, offset: int, size: int) -> bytes: return np.memmap( self.raw_data_path, mode="r", offset=offset, shape=(size,), - ).view(f"S{size}")[0] - + ).view( + f"S{size}" + )[0] + @property def num_elements_per_item(self) -> int: return len(self._codec_types) @@ -109,7 +112,7 @@ def __init__(self, raw_data_path: Path, sample_keys: list[str]): (codec_1, codec_2, ...) The index stores byte positions of the dataset items in the following format: - + (offset, size_1, size_2, ...) The start and end tuple of the j-th value are computed by: @@ -140,57 +143,37 @@ def __init__(self, raw_data_path: Path, sample_keys: list[str]): def read_header(offset: int, size: int) -> int: # read bytes from file - return int.from_bytes( - self._read_bytes(offset, size), - byteorder="big" - ) + return int.from_bytes(self._read_bytes(offset, size), byteorder="big") # read headers - self.data_len = read_header( - offset=0, - size=PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES - ) + self.data_len = read_header(offset=0, size=PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES) self.codecs_len = read_header( - offset=PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES, - size=PackedDataGenerator.CODECS_HEAD_SIZE_IN_BYTES + offset=PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES, size=PackedDataGenerator.CODECS_HEAD_SIZE_IN_BYTES ) # compute offsets to index raw data file - self.data_offset = ( - PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES + - PackedDataGenerator.CODECS_HEAD_SIZE_IN_BYTES - ) + self.data_offset = PackedDataGenerator.DATA_HEAD_SIZE_IN_BYTES + PackedDataGenerator.CODECS_HEAD_SIZE_IN_BYTES self.codecs_offset = self.data_offset + self.data_len self.index_offset = self.codecs_offset + self.codecs_len # read codecs - self._codec_type_hints = self._read_bytes( - offset=self.codecs_offset, - size=self.codecs_len - ) + self._codec_type_hints = self._read_bytes(offset=self.codecs_offset, size=self.codecs_len) self._codec_type_hints = pickle.loads(self._codec_type_hints) # needs to be here to avoid circular import # TODO: find a better way to avoid the circular import from ..config.lookup_types import CodecTypes + # resolve codec types - self._codec_types = [ - getattr(CodecTypes, codec_type_hint).value - for codec_type_hint in self._codec_type_hints - ] + self._codec_types = [getattr(CodecTypes, codec_type_hint).value for codec_type_hint in self._codec_type_hints] # get index - self._index_base = self._read_bytes( - offset=self.index_offset, - size=self.total_bytes - self.index_offset - ) + self._index_base = self._read_bytes(offset=self.index_offset, size=self.total_bytes - self.index_offset) self._index_base = pickle.loads(self._index_base) assert all(len(idx) == len(self._codec_types) + 1 for idx in self._index_base) - + # initialize after codec types are defined because # num_elements_per_item depends on it - super().__init__( - raw_data_path=raw_data_path, sample_keys=sample_keys - ) + super().__init__(raw_data_path=raw_data_path, sample_keys=sample_keys) class PackedMemMapDataset(PackedMemMapDatasetBase): @@ -206,9 +189,7 @@ def __getitem__(self, idx: int) -> BatchEncoding: enc = {} offset = idx[0] - for key, size, codec_type in zip( - self.sample_keys, idx[1:], self._codec_types - ): + for key, size, codec_type in zip(self.sample_keys, idx[1:], self._codec_types): # decode item bytestring = self._read_bytes(offset, size) enc[key] = codec_type.decode(bytestring) @@ -219,7 +200,6 @@ def __getitem__(self, idx: int) -> BatchEncoding: class PackedMemMapDatasetContinuous(PackedMemMapDatasetBase): - def __init__(self, raw_data_path: Path, sample_key: str, block_size: int): """ PackedMemMapDatasetContinuous iterates through the data in block_size sized chunks, @@ -242,16 +222,13 @@ def __init__(self, raw_data_path: Path, sample_key: str, block_size: int): # check if codec is supported if not issubclass(self.codec_type, FixSizedCodec): - raise ValueError( - "Can only read continuously from fix-sized codecs, got %s." - % self.codec_type - ) + raise ValueError("Can only read continuously from fix-sized codecs, got %s." % self.codec_type) self.block_size = block_size # get number of total tokens in file total_values = self.data_len // self._num_bytes_per_value self._num_samples = total_values // self.block_size - + @property def sample_key(self) -> str: return self.sample_keys[0] @@ -272,7 +249,7 @@ def __getitem__(self, idx: int) -> BatchEncoding: # read block-sized chunk of bytes byte_string = self._read_bytes( offset=self.data_offset + idx * self.block_size * self._num_bytes_per_value, - size=self.block_size * self._num_bytes_per_value + size=self.block_size * self._num_bytes_per_value, ) # decode and pack into batch encoding values = self.codec_type.decode(byte_string) diff --git a/src/modalities/exceptions.py b/src/modalities/exceptions.py index c5e5e3a2..07e344d5 100644 --- a/src/modalities/exceptions.py +++ b/src/modalities/exceptions.py @@ -15,4 +15,4 @@ class RunningEnvError(Exception): class TimeRecorderStateError(Exception): - pass \ No newline at end of file + pass diff --git a/src/modalities/logging_broker/message_broker.py b/src/modalities/logging_broker/message_broker.py index d5f4aec2..7b38e58f 100644 --- a/src/modalities/logging_broker/message_broker.py +++ b/src/modalities/logging_broker/message_broker.py @@ -1,12 +1,14 @@ from abc import ABC, abstractmethod from collections import defaultdict +from typing import Dict, List + from modalities.logging_broker.messages import Message, MessageTypes from modalities.logging_broker.subscriber import MessageSubscriberIF -from typing import Dict, List class MessageBrokerIF(ABC): """Interface for message broker objects.""" + @abstractmethod def add_subscriber(self, subscription: MessageTypes, subscriber: MessageSubscriberIF): raise NotImplementedError @@ -18,6 +20,7 @@ def distribute_message(self, message: Message): class MessageBroker(MessageBrokerIF): """The MessageBroker sends notifications to its subscribers.""" + def __init__(self) -> None: self.subscriptions: Dict[MessageTypes, List[MessageSubscriberIF]] = defaultdict(list) diff --git a/src/modalities/logging_broker/publisher.py b/src/modalities/logging_broker/publisher.py index 34ff834b..28cc27de 100644 --- a/src/modalities/logging_broker/publisher.py +++ b/src/modalities/logging_broker/publisher.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from typing import Generic, TypeVar -from modalities.logging_broker.message_broker import Message, MessageBroker +from modalities.logging_broker.message_broker import Message, MessageBroker from modalities.logging_broker.messages import MessageTypes T = TypeVar("T") @@ -15,6 +15,7 @@ def publish_message(self, payload: T, message_type: MessageTypes): class MessagePublisher(MessagePublisherIF[T]): """The MessagePublisher sends messages through a message broker.""" + def __init__( self, message_broker: MessageBroker, diff --git a/src/modalities/logging_broker/subscriber.py b/src/modalities/logging_broker/subscriber.py index 7e965b75..6b4e5c2d 100644 --- a/src/modalities/logging_broker/subscriber.py +++ b/src/modalities/logging_broker/subscriber.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from typing import Generic, TypeVar + from modalities.logging_broker.messages import Message T = TypeVar("T") @@ -11,4 +12,3 @@ class MessageSubscriberIF(ABC, Generic[T]): @abstractmethod def consume_message(self, message: Message[T]): raise NotImplementedError - diff --git a/src/modalities/logging_broker/subscriber_impl/results_subscriber.py b/src/modalities/logging_broker/subscriber_impl/results_subscriber.py index b2965725..92fe0fc1 100644 --- a/src/modalities/logging_broker/subscriber_impl/results_subscriber.py +++ b/src/modalities/logging_broker/subscriber_impl/results_subscriber.py @@ -2,14 +2,15 @@ from typing import Optional import rich +import wandb from rich.console import Group from rich.panel import Panel -import wandb from modalities.batch import EvaluationResultBatch +from modalities.config.config import AppConfig, WandbConfig from modalities.logging_broker.messages import Message from modalities.logging_broker.subscriber import MessageSubscriberIF -from modalities.config.config import AppConfig, WandbConfig + class DummyResultSubscriber(MessageSubscriberIF[EvaluationResultBatch]): def consume_message(self, message: Message[EvaluationResultBatch]): @@ -49,8 +50,15 @@ def consume_message(self, message: Message[EvaluationResultBatch]): class WandBEvaluationResultSubscriber(MessageSubscriberIF[EvaluationResultBatch]): """A subscriber object for the WandBEvaluationResult observable.""" - def __init__(self, num_ranks: int, project: str, experiment_id: str, mode: WandbConfig.WandbMode, dir: Path, - experiment_config: Optional[AppConfig] = None) -> None: + def __init__( + self, + num_ranks: int, + project: str, + experiment_id: str, + mode: WandbConfig.WandbMode, + dir: Path, + experiment_config: Optional[AppConfig] = None, + ) -> None: super().__init__() self.num_ranks = num_ranks @@ -82,6 +90,4 @@ def consume_message(self, message: Message[EvaluationResultBatch]): f"{eval_result.dataloader_tag} {metric_key}": metric_values for metric_key, metric_values in eval_result.throughput_metrics.items() } - wandb.log( - data=throughput_metrics, step=eval_result.global_train_sample_id + 1 - ) + wandb.log(data=throughput_metrics, step=eval_result.global_train_sample_id + 1) diff --git a/src/modalities/models/gpt2/preprocess_dataset.py b/src/modalities/models/gpt2/preprocess_dataset.py index 99afb069..fe5b223e 100644 --- a/src/modalities/models/gpt2/preprocess_dataset.py +++ b/src/modalities/models/gpt2/preprocess_dataset.py @@ -1,21 +1,25 @@ +import os from itertools import chain -from datasets import load_dataset -from transformers import GPT2TokenizerFast, GPT2LMHeadModel, GPT2Config + from accelerate import Accelerator -import os +from datasets import load_dataset +from transformers import GPT2Config, GPT2LMHeadModel, GPT2TokenizerFast def main(): - def group_texts(examples): # Concatenate all texts. concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} total_length = len(concatenated_examples[list(examples.keys())[0]]) - # We drop the small remainder, and if the total_length < block_size we exclude this batch and return an empty dict. - # We could add padding if the model supported it instead of this drop, you can customize this part to your needs. + # We drop the small remainder, and if the total_length < block_size we exclude + # this batch and return an empty dict. We could add padding if the model + # supported it instead of this drop, you can customize this part to your needs. total_length = (total_length // block_size) * block_size # Split by chunks of max_len. - result = {k: [t[i: i + block_size] for i in range(0, total_length, block_size)] for k, t in concatenated_examples.items()} + result = { + k: [t[i : i + block_size] for i in range(0, total_length, block_size)] + for k, t in concatenated_examples.items() + } result["labels"] = result["input_ids"].copy() return result diff --git a/src/modalities/models/model.py b/src/modalities/models/model.py index d00a8043..511419b9 100644 --- a/src/modalities/models/model.py +++ b/src/modalities/models/model.py @@ -1,9 +1,11 @@ from abc import abstractmethod from typing import Dict -from modalities.batch import DatasetBatch, InferenceResultBatch + import torch import torch.nn as nn +from modalities.batch import DatasetBatch, InferenceResultBatch + class NNModel(nn.Module): def __init__(self, seed: int = None): diff --git a/src/modalities/resolver_register.py b/src/modalities/resolver_register.py index 6a87c356..b2417ef4 100644 --- a/src/modalities/resolver_register.py +++ b/src/modalities/resolver_register.py @@ -8,20 +8,20 @@ from transformers import PreTrainedTokenizer from modalities.checkpointing.checkpointing import CheckpointingExecutionIF, CheckpointingStrategyIF -from modalities.config.config import AppConfig, OptimizerTypes, SchedulerTypes +from modalities.config.config import OptimizerTypes, SchedulerTypes from modalities.config.lookup_types import ( - LookupEnum, BatchSamplerTypes, CheckpointingExectionTypes, CheckpointingStrategyTypes, + CodecTypes, CollatorTypes, DataloaderTypes, DatasetTypes, + LookupEnum, LossTypes, ModelTypes, SamplerTypes, TokenizerTypes, - CodecTypes ) from modalities.dataloader.codecs import Codec from modalities.dataloader.dataloader import LLMDataLoader @@ -31,27 +31,22 @@ from modalities.models.gpt2.gpt2_model import GPT2LLM, NNModel from modalities.running_env.fsdp.fsdp_running_env import FSDPRunningEnv, RunningEnv, RunningEnvTypes + # TODO: this should be a singleton class ResolverRegister: - # TODO: args and kwargs only to be backwards compatible # older versions required the appconfig as argument def __init__(self, *args, **kwargs): self._resolver_register = self._build_resolver_register() - - def build_component_by_key_query( - self, register_key: str, type_hint: str, extra_kwargs: Dict = {} - ) -> Any: + + def build_component_by_key_query(self, register_key: str, type_hint: str, extra_kwargs: Dict = {}) -> Any: raise NotImplementedError - - def build_component_by_config( - self, config: BaseModel, extra_kwargs: Dict[str, Any] = {} - ) -> Any: + def build_component_by_config(self, config: BaseModel, extra_kwargs: Dict[str, Any] = {}) -> Any: assert ( "type_hint" in config.model_fields.keys() ), f"Field 'type_hint' missing but needed for initalisation in {config}" - + assert ( "config" in config.model_fields.keys() ), f"Field 'config' missing but needed for initalisation in {config}" @@ -65,11 +60,7 @@ def build_component_by_config( val = kwargs.get(key, val) # handle nested components - if ( - isinstance(val, BaseModel) and - "type_hint" in val.model_fields and - "config" in val.model_fields - ): + if isinstance(val, BaseModel) and "type_hint" in val.model_fields and "config" in val.model_fields: kwargs[key] = self.build_component_by_config(val) else: @@ -80,13 +71,8 @@ def build_component_by_config( register_query=config.type_hint.name, extra_kwargs=kwargs, ) - - def _build_component( - self, - register_key: LookupEnum, - register_query: str, - extra_kwargs: Dict[str, Any] = {} - ): + + def _build_component(self, register_key: LookupEnum, register_query: str, extra_kwargs: Dict[str, Any] = {}): assert register_key in self._resolver_register return self._resolver_register[register_key].make( query=register_query, @@ -135,25 +121,15 @@ def _build_resolver_register(self) -> List[LookupEnum]: base=DataLoader, default=LLMDataLoader, ), - DatasetTypes: ClassResolver( - [t.value for t in DatasetTypes], base=Dataset - ), - CollatorTypes: ClassResolver( - [t.value for t in CollatorTypes], base=GPT2LLMCollator - ), - TokenizerTypes: ClassResolver( - [t.value for t in TokenizerTypes], base=PreTrainedTokenizer - ), - CodecTypes: ClassResolver( - [t.value for t in CodecTypes], base=Codec - ), + DatasetTypes: ClassResolver([t.value for t in DatasetTypes], base=Dataset), + CollatorTypes: ClassResolver([t.value for t in CollatorTypes], base=GPT2LLMCollator), + TokenizerTypes: ClassResolver([t.value for t in TokenizerTypes], base=PreTrainedTokenizer), + CodecTypes: ClassResolver([t.value for t in CodecTypes], base=Codec), CheckpointingStrategyTypes: ClassResolver( - [t.value for t in CheckpointingStrategyTypes], - base=CheckpointingStrategyIF + [t.value for t in CheckpointingStrategyTypes], base=CheckpointingStrategyIF ), # TODO: fix type in execution CheckpointingExectionTypes: ClassResolver( - [t.value for t in CheckpointingExectionTypes], - base=CheckpointingExecutionIF - ) + [t.value for t in CheckpointingExectionTypes], base=CheckpointingExecutionIF + ), } diff --git a/src/modalities/test.py b/src/modalities/test.py index ea16a091..f81c3630 100644 --- a/src/modalities/test.py +++ b/src/modalities/test.py @@ -3,7 +3,6 @@ from rich.progress import Progress with Progress() as progress: - task1 = progress.add_task("[red]Downloading...", total=1000) task2 = progress.add_task("[green]Processing...", total=1000) task3 = progress.add_task("[cyan]Cooking...", total=1000) @@ -12,4 +11,4 @@ progress.update(task1, advance=0.5) progress.update(task2, advance=0.3) progress.update(task3, advance=0.9) - time.sleep(0.02) \ No newline at end of file + time.sleep(0.02) diff --git a/tests/conftest.py b/tests/conftest.py index 67855191..a8a9df92 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,19 +1,18 @@ import dataclasses -import os import json +import os import pickle from pathlib import Path from unittest.mock import MagicMock -import pytest -from PIL import Image import numpy as np +import pytest import torch +from PIL import Image from torch.optim import Optimizer from torch.utils.data.sampler import BatchSampler, SequentialSampler from transformers import GPT2TokenizerFast - from modalities.__main__ import load_app_config_dict from modalities.checkpointing.checkpointing import CheckpointingIF from modalities.config.config import AppConfig @@ -46,9 +45,7 @@ def dummy_packed_data_path(tmpdir) -> Path: tokens = list(range(20)) codecs_bytes = pickle.dumps(["HfTokenizerCodec"]) # headers - data += ( - len(tokens) * int_size_in_bytes - ).to_bytes(data_header_size_in_bytes, byteorder="big") + data += (len(tokens) * int_size_in_bytes).to_bytes(data_header_size_in_bytes, byteorder="big") data += len(codecs_bytes).to_bytes(codecs_header_size_in_bytes, byteorder="big") # data and codecs data += b"".join([t.to_bytes(int_size_in_bytes, byteorder="big") for t in tokens]) @@ -84,8 +81,7 @@ def dummy_data_path(tmpdir) -> DataPathCollection: @pytest.fixture -def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: - +def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: base_path = Path(tmpdir, "image_data") img_base_path = Path(base_path, "images") @@ -94,10 +90,7 @@ def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: data_path = Path(base_path, "data.jsonl") index_path = Path(base_path, "data.idx") - img_paths = [ - Path(img_base_path, "img_%i.png" % i) - for i in range(15) - ] + img_paths = [Path(img_base_path, "img_%i.png" % i) for i in range(15)] # create random images and save them into the temp directory for img_path in img_paths: im = np.random.rand(100, 100, 3) * 255 @@ -110,12 +103,10 @@ def indexed_dummy_image_data_path(tmpdir) -> DataPathCollection: json.dumps( { "img_path": img_path.absolute().as_posix(), - "text": ( - "This item refers to the image stored at %s" - % str(img_path) - ) + "text": ("This item refers to the image stored at %s" % str(img_path)), } - ) + "\n" + ) + + "\n" ) # create the index file to the jsonl file IndexGenerator(data_path).create_index(index_path) diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index a75f664d..50ab98c9 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -1,12 +1,16 @@ import json -import pytest -from PIL import Image import numpy.testing +import pytest +from PIL import Image from modalities.dataloader.codecs import HfTokenizerCodec, PillowImageCodec from modalities.dataloader.create_packed_data import PackedDataGenerator -from modalities.dataloader.dataset import PackedMemMapDataset, PackedMemMapDatasetContinuous, PackedMemMapDatasetMegatron +from modalities.dataloader.dataset import ( + PackedMemMapDataset, + PackedMemMapDatasetContinuous, + PackedMemMapDatasetMegatron, +) @pytest.mark.skip(reason="New packed data format not implemented for megatron dataset") @@ -17,11 +21,7 @@ def test_packed_megatron_dataset_loading(dummy_packed_data_path, block_size, exp def test_packed_dataset_loading(dummy_packed_data_path): - - ds = PackedMemMapDataset( - dummy_packed_data_path, - sample_keys=["input_ids"] - ) + ds = PackedMemMapDataset(dummy_packed_data_path, sample_keys=["input_ids"]) assert len(ds) == 4 assert ds[0]["input_ids"] == [0, 1, 2, 3, 4, 5] @@ -33,7 +33,7 @@ def test_packed_dataset_loading(dummy_packed_data_path): @pytest.mark.parametrize( "block_size, expected_length, expected_output", [ - #(1, 20, [[i] for i in range(20)]), # TODO + # (1, 20, [[i] for i in range(20)]), # TODO (2, 10, [[2 * i, 2 * i + 1] for i in range(10)]), (3, 6, [[3 * i, 3 * i + 1, 3 * i + 2] for i in range(6)]), (10, 2, [list(range(10)), list(range(10, 20))]), @@ -42,19 +42,10 @@ def test_packed_dataset_loading(dummy_packed_data_path): (25, 0, []), ], ) -def test_packed_continuous_dataset_loading( - dummy_packed_data_path, block_size, expected_length, expected_output -): - ds = PackedMemMapDatasetContinuous( - dummy_packed_data_path, - sample_key="input_ids", - block_size=block_size - ) +def test_packed_continuous_dataset_loading(dummy_packed_data_path, block_size, expected_length, expected_output): + ds = PackedMemMapDatasetContinuous(dummy_packed_data_path, sample_key="input_ids", block_size=block_size) assert len(ds) == expected_length - retrieved_input_ids = [ - list(packed_samples["input_ids"]) - for packed_samples in ds - ] + retrieved_input_ids = [list(packed_samples["input_ids"]) for packed_samples in ds] assert retrieved_input_ids == expected_output @@ -64,15 +55,8 @@ def test_packed_continuous_dataset_missing_file(dummy_packed_data_path): PackedMemMapDatasetContinuous(dummy_packed_data_path, block_size=10, sample_key="input_ids") -@pytest.mark.parametrize( - "max_num_of_tokens, expected_index_size", [(None, 12), (10, 1)] -) -def test_create_packed_dataset( - indexed_dummy_data_path, - gpt2_tokenizer, - max_num_of_tokens, - expected_index_size -): +@pytest.mark.parametrize("max_num_of_tokens, expected_index_size", [(None, 12), (10, 1)]) +def test_create_packed_dataset(indexed_dummy_data_path, gpt2_tokenizer, max_num_of_tokens, expected_index_size): block_size = 5 packed_generator = PackedDataGenerator( src_path=indexed_dummy_data_path.raw_data_path, @@ -82,9 +66,8 @@ def test_create_packed_dataset( ) }, max_num_of_bytes=( - (HfTokenizerCodec.TOKEN_SIZE_IN_BYTES * max_num_of_tokens) - if max_num_of_tokens is not None else None - ) + (HfTokenizerCodec.TOKEN_SIZE_IN_BYTES * max_num_of_tokens) if max_num_of_tokens is not None else None + ), ) default_packed_dataset_path = packed_generator._default_destination_path() assert not default_packed_dataset_path.is_file() @@ -112,9 +95,7 @@ def test_packed_image_dataset(indexed_dummy_image_data_path): packed_generator = PackedDataGenerator( src_path=indexed_dummy_image_data_path.raw_data_path, idx_path=indexed_dummy_image_data_path.index_path, - codecs={ - ".img_path": PillowImageCodec() - } + codecs={".img_path": PillowImageCodec()}, ) # get destination path default_packed_dataset_path = packed_generator._default_destination_path() @@ -136,20 +117,15 @@ def test_packed_image_dataset(indexed_dummy_image_data_path): numpy.testing.assert_allclose(src_img, item["img"]) -def test_packed_multimodal_dataset( - indexed_dummy_image_data_path, gpt2_tokenizer -): +def test_packed_multimodal_dataset(indexed_dummy_image_data_path, gpt2_tokenizer): # create packed data file packed_generator = PackedDataGenerator( src_path=indexed_dummy_image_data_path.raw_data_path, idx_path=indexed_dummy_image_data_path.index_path, codecs={ ".img_path": PillowImageCodec(), - ".text": HfTokenizerCodec( - tokenizer=gpt2_tokenizer, - add_eos_token=False - ) - } + ".text": HfTokenizerCodec(tokenizer=gpt2_tokenizer, add_eos_token=False), + }, ) # get destination path default_packed_dataset_path = packed_generator._default_destination_path() From 70747f75e8b262076c8a2506741db1f22f3cb086 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Mon, 19 Feb 2024 14:32:11 +0100 Subject: [PATCH 10/16] fix: add pillow to requirements --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index caa789cd..7ae1fc16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,8 @@ dependencies = [ "jq", "xformers", "class_resolver", - "wandb" + "wandb", + "pillow" ] [project.optional-dependencies] From 19e95d9ee7399e2b7100c8f4827969cb73aa73c9 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Mon, 19 Feb 2024 14:32:40 +0100 Subject: [PATCH 11/16] refactor: fix type --- src/modalities/dataloader/codecs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index 6a204cd4..03cff338 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -89,7 +89,7 @@ def encode(self, img_file_path: str) -> bytes: # write image to buffer with Image.open(img_file_path) as img: img.save(buf, format=self._format) - # retuen buffer content + # return buffer content buf.seek(0) return buf.read() From 8945b0c754a1aaad44c36d28663761ddb7e5871e Mon Sep 17 00:00:00 2001 From: Thomas Holz Date: Mon, 19 Feb 2024 16:00:44 +0100 Subject: [PATCH 12/16] chore: remove unused feature --- config_files/data_config.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/config_files/data_config.yaml b/config_files/data_config.yaml index e3cd5caa..f6899076 100644 --- a/config_files/data_config.yaml +++ b/config_files/data_config.yaml @@ -13,6 +13,3 @@ features: type_hint: PillowImageCodec config: save_format: png - - jq_pattern: .audio_path - codec: - type_hint: TorchaudioAudioCodec From 16d0fb09f64604882b5ab8eeea68f9954929c303 Mon Sep 17 00:00:00 2001 From: Thomas Holz Date: Mon, 19 Feb 2024 16:09:50 +0100 Subject: [PATCH 13/16] refactor: TorchaudioAudioCodec init --- .../config_example_audio_mem_map_dataset.yaml | 16 ++++++++++++++++ src/modalities/config/config.py | 16 +++++++++------- src/modalities/dataloader/codecs.py | 8 +++++--- 3 files changed, 30 insertions(+), 10 deletions(-) create mode 100644 config_files/config_example_audio_mem_map_dataset.yaml diff --git a/config_files/config_example_audio_mem_map_dataset.yaml b/config_files/config_example_audio_mem_map_dataset.yaml new file mode 100644 index 00000000..95f7e226 --- /dev/null +++ b/config_files/config_example_audio_mem_map_dataset.yaml @@ -0,0 +1,16 @@ +features: + - jq_pattern: .transcript + codec: + type_hint: HfTokenizerCodec + config: + add_eos_token: true + tokenizer: + type_hint: GPT2TokenizerFast + config: + tokenizer_file: ./data/tokenizer/tokenizer.json + - jq_pattern: .audio_path + codec: + type_hint: TorchaudioAudioCodec + config: + target_sample_rate: 16_000 + n_mels: 80 diff --git a/src/modalities/config/config.py b/src/modalities/config/config.py index 6c1281bf..a8525568 100644 --- a/src/modalities/config/config.py +++ b/src/modalities/config/config.py @@ -4,13 +4,11 @@ from pathlib import Path from typing import List, Optional, Union -from pydantic import BaseModel, Field, FilePath, PositiveFloat, PositiveInt, confloat, conint, model_validator -from transformers import PretrainedConfig - from modalities.config.lookup_types import ( BatchSamplerTypes, CheckpointingExectionTypes, CheckpointingStrategyTypes, + CodecTypes, CollatorTypes, DataloaderTypes, DatasetTypes, @@ -20,11 +18,12 @@ SamplerTypes, SchedulerTypes, TokenizerTypes, - CodecTypes ) from modalities.config.types import ProcessGroupBackendType from modalities.models.gpt2.gpt2_model import GPT2Config from modalities.running_env.fsdp.fsdp_running_env import RunningEnvConfig +from pydantic import BaseModel, Field, FilePath, PositiveFloat, PositiveInt, confloat, conint, model_validator +from transformers import PretrainedConfig class WandbConfig(BaseModel): @@ -62,14 +61,15 @@ class HfTokenizerCodecConfig(BaseModel): class PillowImageCodecConfig(BaseModel): save_format: str = "png" - class TorchaudioAudioCodec(BaseModel): - pass + class TorchaudioAudioCodecConfig(BaseModel): + target_sample_rate: int = 16_000 + n_mels: int = 80 type_hint: CodecTypes config: Union[ HfTokenizerCodecConfig, PillowImageCodecConfig, - TorchaudioAudioCodec, + TorchaudioAudioCodecConfig, ] = Field(union_mode="left_to_right") @@ -78,6 +78,7 @@ class FeatureConfig(BaseModel): codec: CodecConfig jq_pattern: str + class PreparationAppConfig(BaseModel): features: List[FeatureConfig] @@ -316,6 +317,7 @@ class RunMode(Enum): FROM_SCRATCH = "FROM_SCRATCH" WARM_START = "WARM_START" + class ModalitiesSetupConfig(BaseModel): class WarmStartSettings(BaseModel): checkpoint_model_path: Path diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index bff7a9d7..809cfa88 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -104,14 +104,16 @@ def decode(serialized_img: bytes) -> str: class TorchaudioAudioCodec(Codec[str]): def __init__( self, + target_sample_rate: int = 16_000, + n_mels: int = 80, ) -> None: self.extract_mel_spec = torchaudio.transforms.MelSpectrogram( - sample_rate=16_000, + sample_rate=target_sample_rate, n_fft=400, - n_mels=80, + n_mels=n_mels, hop_length=160, ) - self.target_sample_rate = 16_000 + self.target_sample_rate = target_sample_rate def load_audio( self, From ccce2bab0090a3018a2d8d25f7c06ff2453962fb Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Mon, 19 Feb 2024 17:51:27 +0100 Subject: [PATCH 14/16] fix: add soundfile to requirements for audio processing --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4feecd51..12976396 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,8 @@ dependencies = [ "scipy", "torchaudio", "pillow", - "ffmpeg" + "ffmpeg", + "soundfile" ] [project.optional-dependencies] From 340fab56eb7e102be766cb18659dfb1e9e7d7618 Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Mon, 19 Feb 2024 17:52:33 +0100 Subject: [PATCH 15/16] fix: manually resolve config type for codecs to avoid pydantic parsing errors --- src/modalities/config/config.py | 26 +++++++++++++++++++++----- src/modalities/dataloader/codecs.py | 9 ++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/modalities/config/config.py b/src/modalities/config/config.py index a8525568..c1b09663 100644 --- a/src/modalities/config/config.py +++ b/src/modalities/config/config.py @@ -4,6 +4,9 @@ from pathlib import Path from typing import List, Optional, Union +from pydantic import BaseModel, Field, FilePath, PositiveFloat, PositiveInt, confloat, conint, model_validator +from transformers import PretrainedConfig + from modalities.config.lookup_types import ( BatchSamplerTypes, CheckpointingExectionTypes, @@ -22,8 +25,6 @@ from modalities.config.types import ProcessGroupBackendType from modalities.models.gpt2.gpt2_model import GPT2Config from modalities.running_env.fsdp.fsdp_running_env import RunningEnvConfig -from pydantic import BaseModel, Field, FilePath, PositiveFloat, PositiveInt, confloat, conint, model_validator -from transformers import PretrainedConfig class WandbConfig(BaseModel): @@ -52,7 +53,6 @@ class GPT2TokenizerFastConfig(BaseModel): class CodecConfig(BaseModel): - class HfTokenizerCodecConfig(BaseModel): tokenizer: TokenizerConfig max_length: Optional[int] = None @@ -72,15 +72,31 @@ class TorchaudioAudioCodecConfig(BaseModel): TorchaudioAudioCodecConfig, ] = Field(union_mode="left_to_right") + @model_validator(mode="before") + def _resolve_type(cls, data): + if isinstance(data, dict): + # resolve config type from type hint + type_hint = data["type_hint"] + CONFIG_RESOLVER = { + CodecTypes.HfTokenizerCodec.name: cls.HfTokenizerCodecConfig, + CodecTypes.PillowImageCodec.name: cls.PillowImageCodecConfig, + CodecTypes.TorchaudioAudioCodec.name: cls.TorchaudioAudioCodecConfig, + } + # create config object of correct type + config_type = CONFIG_RESOLVER.get(type_hint) + config = config_type(**data["config"]) + # return codec config + return {"type_hint": type_hint, "config": config} + + return data -class FeatureConfig(BaseModel): +class FeatureConfig(BaseModel): codec: CodecConfig jq_pattern: str class PreparationAppConfig(BaseModel): - features: List[FeatureConfig] diff --git a/src/modalities/dataloader/codecs.py b/src/modalities/dataloader/codecs.py index 809cfa88..90a3b457 100644 --- a/src/modalities/dataloader/codecs.py +++ b/src/modalities/dataloader/codecs.py @@ -102,18 +102,21 @@ def decode(serialized_img: bytes) -> str: class TorchaudioAudioCodec(Codec[str]): + N_FFT = 400 + HOP_LENGTH = 160 + def __init__( self, target_sample_rate: int = 16_000, n_mels: int = 80, ) -> None: + self.target_sample_rate = target_sample_rate self.extract_mel_spec = torchaudio.transforms.MelSpectrogram( sample_rate=target_sample_rate, - n_fft=400, n_mels=n_mels, - hop_length=160, + n_fft=type(self).N_FFT, + hop_length=type(self).HOP_LENGTH, ) - self.target_sample_rate = target_sample_rate def load_audio( self, From 3a16d6ee15f56222d9aef0ea0f93dbd9a4a7913b Mon Sep 17 00:00:00 2001 From: Niclas Doll Date: Mon, 26 Feb 2024 15:21:29 +0100 Subject: [PATCH 16/16] fix: fixed issue that discards all-zero bytes at the end of a read operation --- src/modalities/dataloader/dataset.py | 4 +--- tests/dataloader/test_packed_dataset.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index 55ac80c1..1633df81 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -91,9 +91,7 @@ def _read_bytes(self, offset: int, size: int) -> bytes: mode="r", offset=offset, shape=(size,), - ).view( - f"S{size}" - )[0] + ).tobytes() @property def num_elements_per_item(self) -> int: diff --git a/tests/dataloader/test_packed_dataset.py b/tests/dataloader/test_packed_dataset.py index 998bcd96..22cc9494 100644 --- a/tests/dataloader/test_packed_dataset.py +++ b/tests/dataloader/test_packed_dataset.py @@ -33,7 +33,7 @@ def test_packed_dataset_loading(dummy_packed_data_path): @pytest.mark.parametrize( "block_size, expected_length, expected_output", [ - # (1, 20, [[i] for i in range(20)]), # TODO + (1, 20, [[i] for i in range(20)]), (2, 10, [[2 * i, 2 * i + 1] for i in range(10)]), (3, 6, [[3 * i, 3 * i + 1, 3 * i + 2] for i in range(6)]), (10, 2, [list(range(10)), list(range(10, 20))]),