diff --git a/neurons/validator.py b/neurons/validator.py index d0f59b6..439a525 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -483,7 +483,7 @@ async def run(self): eval_start = tplr.T() self.model.zero_grad() total_loss = 0.0 - loss_after = 0.0 + step_loss_after = 0.0 full_steps = 0 total_steps = 0 exhausted_window = False @@ -514,7 +514,8 @@ async def run(self): # Perform forward pass with updated model (no gradients needed) with torch.no_grad(), torch.amp.autocast(device_type=self.model.device.type, dtype=torch.bfloat16): - outputs2 = self.model(input_ids=input_ids, labels=labels) + outputs_after = self.model(input_ids=input_ids, labels=labels) + step_loss_after += outputs_after.loss.item() # Restore original parameters for name_i, param_i in self.model.named_parameters(): @@ -522,36 +523,30 @@ async def run(self): # Perform forward pass and compute loss with gradients with torch.enable_grad(), torch.amp.autocast(device_type=self.model.device.type, dtype=torch.bfloat16): - outputs = self.model(input_ids=input_ids, labels=labels) - loss = outputs.loss + outputs_before = self.model(input_ids=input_ids, labels=labels) + loss = outputs_before.loss loss.backward() - - total_loss += outputs.loss.item() - loss_after += outputs2.loss.item() + total_loss += loss.item() if self.current_window - offset != window: exhausted_window = True continue + self.optimizer.step() self.scheduler.step() - step_loss = total_loss/(full_steps+1) - step_loss_after = loss_after/(full_steps+1) - - if loss_after <= step_loss: - # Reward for loss reduction - loss_score = 1 - (step_loss_after / step_loss) - else: - # Penalize for loss increase, capped at -1 - loss_score = -min(1, (step_loss_after - step_loss) / step_loss) + step_loss = total_loss / (full_steps + 1) + step_loss_after = step_loss_after / (full_steps + 1) eval_duration = tplr.T() - eval_start tokens_per_step = self.hparams.sequence_length * self.config.actual_batch_size * (full_steps + 1) - tokens_per_second = tokens_per_step / eval_duration + tplr.logger.info(f"{tplr.P(window, eval_duration)}: Accumulated gradients:") tplr.logger.info(f"{tplr.P(window, eval_duration)}: \tTotal steps: [tan]{full_steps}/{total_steps}[/tan], Rate: [tan]{(full_steps/total_steps):.2f}[/tan], Target: [tan]{self.sample_rate:.2f}[/tan]") tplr.logger.info(f"{tplr.P(window, eval_duration)}: \tTotal tokens: [tan]{tokens_per_step}[/tan], Tokens per second: [tan]{tokens_per_second:.2f}[/tan]") - tplr.logger.info(f"{tplr.P(window, eval_duration)}: \tLoss: [tan]{step_loss}[tan]") + tplr.logger.info(f"{tplr.P(window, eval_duration)}: \tLoss before applying delta: [tan]{step_loss:.4f}[/tan]") + tplr.logger.info(f"{tplr.P(window, eval_duration)}: \tLoss after applying delta: [tan]{step_loss_after:.4f}[/tan]") + if exhausted_window: self.sample_rate = max(0.0001, self.sample_rate * 0.95) else: @@ -559,51 +554,41 @@ async def run(self): # Compute the score for this slice. st = tplr.T() - score = 0.0 - - # Check if we have any gradients - has_grads = any(param.grad is not None for name, param in self.model.named_parameters()) - - if not has_grads: - tplr.logger.warning("No gradients found - setting score to 0.0") - score = 0.0 - else: - # Collect all delta_i and grad_i into larger vectors - all_delta = [] - all_grad = [] - for i, (name_i, param_i) in enumerate(self.model.named_parameters()): - if param_i.grad is None: - continue - - if name_i not in indices or name_i not in eval_slice_data: - continue + # Compute cosine similarity between miner's delta and validator's gradients + cosine_similarity = torch.nn.functional.cosine_similarity(all_delta, all_grad, dim=0).item() - idxs_i = indices[name_i].to(self.model.device) - grad_i = param_i.grad.view(-1).clone()[idxs_i].to(self.model.device) - slice_i = eval_slice_data[name_i].view(-1).to(self.model.device) - theta_i = param_i.data.view(-1)[idxs_i] - delta_i = theta_i - slice_i + # Set initial score to 0.0 + score = 0.0 - all_delta.append(delta_i) - all_grad.append(grad_i) + # Check if cosine similarity is greater than zero + if cosine_similarity > 0.0: + # Base score from cosine similarity + base_score = 0.1 - if len(all_delta) > 0: - #Concatenate all parts - all_delta = torch.cat(all_delta) - all_grad = torch.cat(all_grad) + # Compute the loss difference (percentage) + loss_difference = step_loss_after - step_loss # Positive if miner's loss is worse + percentage_loss_difference = loss_difference / step_loss # Fractional change - # Compute global cosine similarity - score = torch.nn.functional.cosine_similarity(all_delta, all_grad, dim=0).item() + if percentage_loss_difference < 0: # Miner improved the loss + # Miner improved the loss, add to base score + score = base_score + (-percentage_loss_difference) # Negative because loss decreased + elif percentage_loss_difference <= 0.25: + # Loss did not improve but is not worse by more than 25% + score = base_score # Only base score else: - tplr.logger.warning("No valid parameter tensors found - setting score to 0.0") + # Loss is worse by more than 25%, zero out their moving average score + self.scores[eval_uid] = 0.0 score = 0.0 + else: + tplr.logger.info(f"Cosine similarity ({cosine_similarity:.4f}) not positive. Setting score to 0.0") + score = 0.0 - tplr.logger.info(f"{tplr.P(window, tplr.T() - st)}: Computed score: [bold dark_sea_green]{score:.4f}[/bold dark_sea_green]") - self.optimizer.zero_grad() - + tplr.logger.info(f"{tplr.P(window, tplr.T() - st)}: Computed score for miner {eval_uid}: [bold dark_sea_green]{score:.4f}[/bold dark_sea_green]") + self.optimizer.zero_grad() # Assign and log scores. + # Apply decay to miners who did not submit slices all_uids = set(self.metagraph.uids.tolist()) non_submitted_uids = all_uids - submitted_uids @@ -613,46 +598,39 @@ async def run(self): self.scores[uid] *= decay_factor # Update the score for the evaluated miner - self.step_scores[eval_uid] = score + loss_score - self.step_loss_scores[eval_uid] = loss_score + self.step_scores[eval_uid] = score self.scores[eval_uid] = ( - (1 - self.hparams.validator_moving_alpha) * self.step_scores[eval_uid] + + (1 - self.hparams.validator_moving_alpha) * self.step_scores[eval_uid] + self.hparams.validator_moving_alpha * self.scores[eval_uid] ) - # Only consider positive scores for weights - positive_scores_indices = self.scores > 0 - positive_scores = self.scores[positive_scores_indices] + # Prepare scores for softmax + scores_tensor = self.scores.clone() - total_positive_score = positive_scores.sum().item() + # Set scores <= 0 to a very negative value for softmax stability + scores_tensor[scores_tensor <= 0] = -float('inf') - if total_positive_score == 0.0: - tplr.logger.warning("Total positive score is zero; setting all weights to zero.") - self.weights = torch.zeros_like(self.scores) - else: - # Normalize positive scores to get weights - self.weights = torch.zeros_like(self.scores) - self.weights[positive_scores_indices] = positive_scores / total_positive_score + # Compute softmax over scores + self.weights = torch.nn.functional.softmax(scores_tensor, dim=0) # Log updated scores and weights - valid_score_indices = torch.nonzero(self.scores != 0).squeeze().view(-1) + valid_score_indices = torch.nonzero(self.scores > 0).squeeze().view(-1) for uid_i in valid_score_indices: uid = uid_i.item() moving_score = self.scores[uid].item() weight = self.weights[uid].item() step_score = self.step_scores[uid].item() - loss_score = self.step_loss_scores[uid].item() tplr.logger.info( f"\tuid: [dark_sea_green]{uid}[/dark_sea_green], " f"step_score: [dark_sea_green]{step_score:.3f}[/dark_sea_green], " f"moving_score: [dark_sea_green]{moving_score:.3f}[/dark_sea_green], " - f"weight: [dark_sea_green]{weight:.3f}[/dark_sea_green], " - f"loss_score: [dark_sea_green]{loss_score:.3f}[/dark_sea_green]" + f"weight: [dark_sea_green]{weight:.3f}[/dark_sea_green]" ) + # Apply all deltas to the model state. st = tplr.T() - max_global_step, window_metric = await tplr.apply_slices_to_model( - model=self.model, + max_global_step, window_metric = await tplr.apply_slices_to_model( + model=self.model, window=window, seed=window, compression=self.hparams.compression, @@ -667,8 +645,8 @@ async def run(self): st = tplr.T() await tplr.delete_files_before_window(window_max=window - self.hparams.max_history, save_location=self.save_location, key='state') await tplr.delete_files_before_window(window_max=window - self.hparams.max_history, save_location=self.save_location, key='delta') - await tplr.delete_files_from_bucket_before_window( bucket = tplr.config.BUCKET_SECRETS["bucket_name"], window_max = window - self.hparams.max_history, key = 'state' ) - await tplr.delete_files_from_bucket_before_window( bucket = tplr.config.BUCKET_SECRETS["bucket_name"], window_max = window - self.hparams.max_history, key = 'delta' ) + await tplr.delete_files_from_bucket_before_window(bucket=tplr.config.BUCKET_SECRETS["bucket_name"], window_max=window - self.hparams.max_history, key='state') + await tplr.delete_files_from_bucket_before_window(bucket=tplr.config.BUCKET_SECRETS["bucket_name"], window_max=window - self.hparams.max_history, key='delta') tplr.logger.info(f"{tplr.P(window, tplr.T() - st)}: Cleaned file history.") # Finish step. @@ -678,14 +656,15 @@ async def run(self): window_time_delta = self.window_time - gs_end window_delta_str = f"[red]{window_time_delta:.2f}[/red]" if window_time_delta < 0 else f"[green]+{window_time_delta:.2f}[/green]" tplr.logger.info(f"{tplr.P(window, gs_end - gs_start)}[{window_delta_str}]: Finished step.") + # Log main metrics wandb.log({ "validator/loss": step_loss, - "validator/tokens_per_step": sum([slice_metric['tokens_per_step'] for _, slice_metric in window_metric.items()]), - "validator/tokens_per_second": sum([slice_metric['tokens_per_second'] for _, slice_metric in window_metric.items()]), + "validator/tokens_per_step": sum(slice_metric['tokens_per_step'] for _, slice_metric in window_metric.items()), + "validator/tokens_per_second": sum(slice_metric['tokens_per_second'] for _, slice_metric in window_metric.items()), "validator/sample_rate": self.sample_rate, "validator/utilization": eval_duration / (gs_end - gs_start), - "validator/global_batch_size": sum([slice_metric['batch_size'] for _, slice_metric in window_metric.items()]), + "validator/global_batch_size": sum(slice_metric['batch_size'] for _, slice_metric in window_metric.items()), }, step=self.global_step) for hotkey, slice_metric in window_metric.items(): diff --git a/scripts/dataset_backup.py b/scripts/dataset_backup.py new file mode 100644 index 0000000..8f02f20 --- /dev/null +++ b/scripts/dataset_backup.py @@ -0,0 +1,511 @@ +# The MIT License (MIT) +# © 2024 templar.tech + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +# Global imports +import asyncio +import aiohttp +import numpy as np +import random +import typing +from torch.utils.data import IterableDataset +from transformers import AutoTokenizer + + +class SubsetLoader(IterableDataset): + """ + Base class for data-specific subset loader classes. + + # TODO: Make this class abstract + """ + + def __init__( + self, + batch_size=None, + sequence_length=None, + num_pages=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, + ): + self.batch_size = batch_size + self.sequence_length = sequence_length + self.num_pages = num_pages + self.tokenizer = tokenizer + self.pack_samples = pack_samples + + self.num_rows_per_page = 100 + + # Buffer to hold pages loaded from the api + self.buffer = [] + + # Buffer to hold pages already loaded into a batch + self.used_buffer = [] + + # Buffer to hold padded pages + self.padded_buffer = [] + + self.lock = asyncio.Lock() # For thread-safe operations + + async def fetch_data_for_pages(self, pages): + """ + Set the pages to be used to fill the buffer. Then fetch the page data + to the buffer. + """ + + self.pages = pages + + # Empty the buffer if it is not. + self.buffer = [] + + async with aiohttp.ClientSession() as session: + tasks = [self._fetch_data_for_page(page, session) for page in self.pages] + await asyncio.gather(*tasks) + + async def _fetch_data_for_page(self, page, session): + retry_limit = 10 + attempt = 0 + while attempt < retry_limit: + config_name, page_number, split = page + + # Create the request parameters + params = dict( + dataset=self.name, + config=config_name, + split=split, + offset=page_number, + limit=self.num_rows_per_page, + ) + + try: + async with session.get(self.rows_base_url, params=params) as response: + response.raise_for_status() + data = await response.json() + + # Prepare the data to append + buffer_to_append = [] + for row in data["rows"]: + content = row["row"]["text"] + input_ids = self.tokenizer(content, truncation=True)[ + "input_ids" + ] + buffer_to_append.extend(input_ids) + buffer_to_append.append(self.tokenizer.eos_token_id) + + async with self.lock: + self.buffer.extend(buffer_to_append) + self.pages.append((config_name, page_number, split)) + break # Success, exit retry loop + + except aiohttp.ClientResponseError: + attempt += 1 + if attempt < retry_limit: + await asyncio.sleep(5) + else: + raise + + def _get_pad_size(self, input_ids): + """ + Get the number of tokens to be padded to the sample to match + the max allowed sequence length. + If sample packing is activated, then return 1 + """ + + if self.pack_samples: + return 1 + + sample_size = len(input_ids) + + remainder = sample_size % self.sequence_length + pad_size = self.sequence_length - remainder + + # Apply modulo again to guarantee a pad size of 0 if remainder is 0 + pad_size = pad_size % self.sequence_length + + return pad_size + + def _refill_padded_buffer(self): + """ + This methods pulls one page from `self.buffer`, pads it and pushs + it to the `self.padded_buffer`. + """ + + while self.buffer and len(self.padded_buffer) < self.sequence_length: + input_ids = [] + + # search for EOS token index and cut the buffer at it. + EOS_index = self.buffer.index(self.tokenizer.eos_token_id) + input_ids = self.buffer[: EOS_index + 1] + self.buffer = self.buffer[EOS_index + 1 :] + + self.used_buffer += input_ids + + # Add to padded buffer without the EOS token. + self.padded_buffer += input_ids[:-1] + + # Pad + self.padded_buffer += [self.tokenizer.eos_token_id] * self._get_pad_size( + input_ids=input_ids[:-1] + ) + + def __iter__(self): + self.buffer = self.used_buffer + self.buffer + self.padded_buffer = [] + + # Pad and prepare one page for batching + self._refill_padded_buffer() + + return self + + def __next__(self): + batch = [] + + while len(self.padded_buffer) >= self.sequence_length: + batch.append(self.padded_buffer[: self.sequence_length]) + self.padded_buffer = self.padded_buffer[self.sequence_length :] + self._refill_padded_buffer() + + if len(batch) == self.batch_size: + return np.stack(batch) + + raise StopIteration + + +class DatasetLoader(SubsetLoader): + name: str = "HuggingFaceFW/fineweb-edu-score-2" + rows_base_url: str = "https://datasets-server.huggingface.co/rows" + size_base_url: str = "https://datasets-server.huggingface.co/size" + + retry_limit: int = 10 # Number of retries + retry_delay: int = 5 # Seconds to wait between retries + num_rows_per_page: int = 100 + + @staticmethod + async def next_pages( + offset: int, n_pages: int, seed: str, num_rows_per_page: int = 100 + ): + configs_data = await DatasetLoader.fetch_dataset_configs() + rng = np.random.default_rng( + hash(seed) & 0xFFFFFFFF + ) # Create a generator with a seed + rng.bit_generator.advance(offset) # Efficiently skip ahead `n` steps + result = [] + for _ in range(n_pages): + config = rng.choice(list(configs_data.keys())) + choice = rng.integers( + 0, configs_data[config]["num_rows"] - 1 - num_rows_per_page + ) + result.append((str(config), int(choice), configs_data[config]["split"])) + return result + + def __init__( + self, + batch_size=None, + sequence_length=None, + num_pages=None, + pages_info=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, + ): + super().__init__( + batch_size, sequence_length, num_pages, tokenizer, pack_samples + ) + + # Initialize properties + self.configs_data = None + self.pages = [] + self.buffer = [] + self.lock = asyncio.Lock() # For thread-safe operations + + @classmethod + async def create( + cls, + batch_size=None, + sequence_length=None, + num_pages=None, + pages_info=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, + ): + self = cls( + batch_size=batch_size, + sequence_length=sequence_length, + num_pages=num_pages, + tokenizer=tokenizer, + pack_samples=pack_samples, + ) + + # Fetch dataset configs asynchronously + self.configs_data = await cls.fetch_dataset_configs() + + if pages_info is not None: + await self._fetch(pages_info) + elif self.num_pages: + await self._fetch_data_to_buffer(self.num_pages) + + return self + + async def _fetch(self, page_info: typing.Tuple[str, int, str]): + self.pages = list(page_info) + async with aiohttp.ClientSession() as session: + tasks = [ + self._fetch_data_for_page((config_name, page, split), session) + for (config_name, page, split) in self.pages + ] + await asyncio.gather(*tasks) + + async def _fetch_data_to_buffer(self, num_pages): + """ + Randomly sample pages and add their data to the buffer. + If a page is inaccessible, another one is sampled. + This method sets the `pages` property. + """ + self.pages = [] + pages_to_fetch = self.get_random_pages(num_pages) + + async with aiohttp.ClientSession() as session: + tasks = [ + self._fetch_data_for_page(page, session) for page in pages_to_fetch + ] + await asyncio.gather(*tasks) + + async def fetch_data_to_rows(self, num_pages): + rows = [] + pages_to_fetch = self.get_random_pages(num_pages) + + async with aiohttp.ClientSession() as session: + tasks = [ + self._fetch_rows_for_page(page, session) for page in pages_to_fetch + ] + results = await asyncio.gather(*tasks) + for page_rows in results: + rows.extend(page_rows) + + return rows + + async def _fetch_data_for_page(self, page, session): + """ + Fetches data asynchronously for a single page, processes it without blocking the event loop, + and appends the tokenized data to the buffer. + + Args: + page: A tuple containing the config name, page number, and split. + session: The HTTP session used for making requests. + + Raises: + Exception: If the maximum number of retry attempts is exceeded. + """ + retry_limit = self.retry_limit + attempt = 0 + while attempt < retry_limit: + config_name, page_number, split = page + + # Create the request parameters + params = { + "dataset": self.name, + "config": config_name, + "split": split, + "offset": page_number, + "limit": self.num_rows_per_page, + } + + try: + # Make an asynchronous HTTP GET request to fetch the data + async with session.get(self.rows_base_url, params=params) as response: + response.raise_for_status() # Raise an exception for HTTP errors + data = await response.json() + + # Prepare the data to append + buffer_to_append = [] + + # Asynchronously process each row without blocking the event loop + tasks = [ + self._tokenize_content(row["row"]["text"]) + for row in data["rows"] + ] + + # Gather the tokenized results concurrently + row_input_ids = await asyncio.gather(*tasks) + + # Flatten the list of input IDs and append them to the buffer + for input_ids in row_input_ids: + buffer_to_append.extend(input_ids) + + # Safely append the processed data to the shared buffer + async with self.lock: + self.buffer.extend(buffer_to_append) + self.pages.append((config_name, page_number, split)) + break # Success, exit retry loop + + except aiohttp.ClientResponseError as e: + # Handle HTTP client errors with a retry mechanism + attempt += 1 + if attempt < retry_limit: + await asyncio.sleep(self.retry_delay) # Wait before retrying + else: + raise Exception( + f"Maximum retry attempts exceeded for page {page}" + ) from e + + async def _tokenize_content(self, content): + """ + Asynchronously tokenizes a string of content using the tokenizer in a separate thread. + + Args: + content: The text content to be tokenized. + + Returns: + The list of token IDs for the content, including the EOS token. + """ + # Offload the CPU-bound tokenization to a thread executor to prevent blocking the event loop + input_ids = await asyncio.to_thread( + self.tokenizer.encode, + content, + truncation=True, + max_length=self.sequence_length, + ) + input_ids.append(self.tokenizer.eos_token_id) + return input_ids + + async def _fetch_rows_for_page(self, page, session): + retry_limit = self.retry_limit + attempt = 0 + while attempt < retry_limit: + config_name, page_number, split = page + + # Create the request parameters + params = dict( + dataset=self.name, + config=config_name, + split=split, + offset=page_number, + limit=self.num_rows_per_page, + ) + + try: + async with session.get(self.rows_base_url, params=params) as response: + response.raise_for_status() + data = await response.json() + + # Collect the rows + return [row["row"]["text"] for row in data["rows"]] + + except aiohttp.ClientResponseError: + attempt += 1 + if attempt < retry_limit: + await asyncio.sleep(self.retry_delay) + else: + raise + + def get_random_pages(self, num_pages): + """ + Randomly sample pages. + A page is a row number of a given split of a given dataset dump. + """ + pages = [] + + for _ in range(num_pages): + # Choose a random config + config_name = random.choice(list(self.configs_data.keys())) + + # Choose a random page (row) + page = random.randint( + 0, + self.configs_data[config_name]["num_rows"] - 1 - self.num_rows_per_page, + ) + + split = self.configs_data[config_name]["split"] + + pages.append((config_name, page, split)) + + return pages + + def get_page_names(self): + """ + This is a utility function that returns the page names that were used. + Each page as a single string instead of a tuple. + """ + page_names = [] + + if hasattr(self, "pages"): + page_names = [ + f"{cfg_name}_{num_rows}_{split}" + for cfg_name, num_rows, split in self.pages + ] + + return page_names + + @staticmethod + async def fetch_dataset_configs() -> typing.Dict[str, typing.Dict]: + """ + Fetch the different dump names, aka configs, aka samples, of the + dataset. + The returned value is a dictionary with dump names as keys and + a dict of the number of rows and the split as values. + """ + # Request parameters + params = dict(dataset=DatasetLoader.name) + + attempt = 0 + while attempt < DatasetLoader.retry_limit: + try: + async with aiohttp.ClientSession() as session: + async with session.get( + DatasetLoader.size_base_url, params=params + ) as response: + response.raise_for_status() + + data = await response.json() + + # Extract the configs dict + configs_dict = data["size"]["splits"] + + # Now create a dict with config names (except 'default') as + # keys, and the number of rows as values + configs_data = { + entry["config"]: { + "num_rows": entry["num_rows"], + "split": entry["split"], + } + for entry in configs_dict + if entry["config"] != "default" + } + + return configs_data + + except aiohttp.ClientResponseError: + attempt += 1 + if attempt < DatasetLoader.retry_limit: + await asyncio.sleep(DatasetLoader.retry_delay) + else: + raise + + @staticmethod + async def next_pages_async( + offset: int, n_pages: int, seed: str, num_rows_per_page: int = 100 + ): + configs_data = await DatasetLoader.fetch_dataset_configs() + rng = np.random.default_rng( + hash(seed) & 0xFFFFFFFF + ) # Create a generator with a seed + rng.bit_generator.advance(offset) # Efficiently skip ahead `n` steps + result = [] + for _ in range(n_pages): + config = rng.choice(list(configs_data.keys())) + choice = rng.integers( + 0, configs_data[config]["num_rows"] - 1 - num_rows_per_page + ) + result.append((str(config), int(choice), configs_data[config]["split"])) + return result diff --git a/src/templar/__init__.py b/src/templar/__init__.py index 8cb2445..ee18473 100644 --- a/src/templar/__init__.py +++ b/src/templar/__init__.py @@ -20,8 +20,8 @@ # mypy: ignore-errors # type: ignore -__version__ = "0.1.29" -version_key = 3000 +__version__ = "0.1.30" +version_key = 4000 # Import package. from .autoupdate import * diff --git a/src/templar/dataset.py b/src/templar/dataset.py index 8f02f20..85b25d8 100644 --- a/src/templar/dataset.py +++ b/src/templar/dataset.py @@ -184,7 +184,7 @@ def __next__(self): class DatasetLoader(SubsetLoader): - name: str = "HuggingFaceFW/fineweb-edu-score-2" + name: str = "HuggingFaceFW/fineweb-2" rows_base_url: str = "https://datasets-server.huggingface.co/rows" size_base_url: str = "https://datasets-server.huggingface.co/size" @@ -197,16 +197,21 @@ async def next_pages( offset: int, n_pages: int, seed: str, num_rows_per_page: int = 100 ): configs_data = await DatasetLoader.fetch_dataset_configs() - rng = np.random.default_rng( - hash(seed) & 0xFFFFFFFF - ) # Create a generator with a seed - rng.bit_generator.advance(offset) # Efficiently skip ahead `n` steps + available_configs = [ + config + for config in configs_data + if configs_data[config]["num_rows"] - 1 - num_rows_per_page > 0 + ] + + rng = np.random.default_rng(hash(seed) & 0xFFFFFFFF) + rng.bit_generator.advance(offset) result = [] for _ in range(n_pages): - config = rng.choice(list(configs_data.keys())) - choice = rng.integers( - 0, configs_data[config]["num_rows"] - 1 - num_rows_per_page - ) + if not available_configs: + raise ValueError("No available configs with sufficient num_rows.") + config = rng.choice(available_configs) + max_row = configs_data[config]["num_rows"] - 1 - num_rows_per_page + choice = rng.integers(0, max_row) result.append((str(config), int(choice), configs_data[config]["split"])) return result @@ -455,7 +460,6 @@ async def fetch_dataset_configs() -> typing.Dict[str, typing.Dict]: The returned value is a dictionary with dump names as keys and a dict of the number of rows and the split as values. """ - # Request parameters params = dict(dataset=DatasetLoader.name) attempt = 0 @@ -463,25 +467,35 @@ async def fetch_dataset_configs() -> typing.Dict[str, typing.Dict]: try: async with aiohttp.ClientSession() as session: async with session.get( - DatasetLoader.size_base_url, params=params + f"{DatasetLoader.size_base_url}", params=params ) as response: response.raise_for_status() - data = await response.json() - # Extract the configs dict - configs_dict = data["size"]["splits"] - - # Now create a dict with config names (except 'default') as - # keys, and the number of rows as values - configs_data = { - entry["config"]: { - "num_rows": entry["num_rows"], - "split": entry["split"], + # Adjust parsing based on the new data structure + configs_data = {} + if "size" in data and "splits" in data["size"]: + configs_dict = data["size"]["splits"] + configs_data = { + entry["config"]: { + "num_rows": entry["num_rows"], + "split": entry["split"], + } + for entry in configs_dict + if entry["config"] != "default" + } + elif "splits" in data: + # New data structure + configs_dict = data["splits"] + configs_data = { + entry["config"]: { + "num_rows": entry.get("numExamples", 0), + "split": entry["split"], + } + for entry in configs_dict } - for entry in configs_dict - if entry["config"] != "default" - } + else: + raise ValueError("Unexpected data format from size API.") return configs_data