diff --git a/src/pie_datasets/document/processing/__init__.py b/src/pie_datasets/document/processing/__init__.py new file mode 100644 index 00000000..6bf9b0c1 --- /dev/null +++ b/src/pie_datasets/document/processing/__init__.py @@ -0,0 +1 @@ +from .regex_partitioner import RegexPartitioner diff --git a/src/pie_datasets/document/processing/regex_partitioner.py b/src/pie_datasets/document/processing/regex_partitioner.py new file mode 100644 index 00000000..49df118e --- /dev/null +++ b/src/pie_datasets/document/processing/regex_partitioner.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import json +import logging +import re +import statistics +from typing import Any, Callable, Iterable, Iterator, Match, TypeVar + +from pytorch_ie import Dataset, IterableDataset +from pytorch_ie.annotations import LabeledSpan +from pytorch_ie.data.common import EnterDatasetMixin, ExitDatasetMixin +from pytorch_ie.documents import TextBasedDocument + +logger = logging.getLogger(__name__) + + +D = TypeVar("D", bound=TextBasedDocument) + + +def create_regex_matcher(pattern): + return re.compile(pattern).finditer + + +def strip_span(start: int, end: int, text: str) -> tuple[int, int]: + """This method strips the leading and trailing whitespaces from the span. + + :param start: An integer value that represents the start index of the span. + :param end: An integer value that represents the end index of the span. + :param text: A string value that represents the text from which the span is extracted. + """ + span_text = text[start:end] + new_start = start + len(span_text) - len(span_text.lstrip()) + new_end = end - len(span_text) + len(span_text.rstrip()) + # if the span is empty, then create a span of length 0 at the start index + if new_start >= new_end: + new_start = start + new_end = start + return new_start, new_end + + +def _get_partitions_with_matcher( + text: str, + matcher_or_pattern: Callable[[str], Iterable[Match]] | str, + label_group_id: int | None = None, # = 1, + label_whitelist: list[str] | None = None, + skip_initial_partition: bool = False, # = True + default_partition_label: str = "partition", + initial_partition_label: str | None = None, + strip_whitespace: bool = False, + verbose: bool = True, +) -> Iterator[LabeledSpan]: + """This method yields LabeledSpans as partitions of the given text. matcher is used to search + for a pattern in the text. If the pattern is found, it returns a Match object that contains + matched groups. A partition is then created using a span in the matched groups. The span of a + partition starts from the first match (inclusive) and ends at the next match (exclusive) or at + the end of the text. A partition is labeled either using the default_partition_label or using + the list of labels available in label_whitelist. It should be noted that none of the partitions + overlap. + + :param text: A text that is to be partitioned + :param matcher_or_pattern: A method or a string. In the former case, that method is used to + find a pattern in the text and return an iterator yielding the Match objects, e.g. + re.compile(PATTERN).finditer. In the latter, the string is used as a pattern to find the + matches in the text. + :param label_group_id: An integer value (default:None) to select the desired match group from + the Match object. This match group is then used to create a label for the partition. + :param label_whitelist: An optional list of labels (default:None) which are allowed to form a + partition if label_group_id is not None. label_whitelist is the whitelist for the labels + created using label_group_id. If label_whitelist is None, then all the labels created using + label_group_id will form a partition. + :param skip_initial_partition: A boolean value (default:False) that prevents the initial + partition to be saved. + :param default_partition_label: A string value (default:partition) to be used as the default + label for the parts if no label_group_id for the match object is provided. + :param initial_partition_label: A string value (default:None) to be used as a label for the + initial partition. This is only used when skip_initial_partition is False. If it is None + then default_partition_label is used as initial_partition_label. + """ + if isinstance(matcher_or_pattern, str): + matcher = create_regex_matcher(matcher_or_pattern) + else: + matcher = matcher_or_pattern + if initial_partition_label is None: + initial_partition_label = default_partition_label + previous_start = previous_label = None + if not skip_initial_partition: + if label_whitelist is None or initial_partition_label in label_whitelist: + previous_start = 0 + previous_label = initial_partition_label + for match in matcher(text): + if label_group_id is not None: + start = match.start(label_group_id) + end = match.end(label_group_id) + label = text[start:end] + else: + label = default_partition_label + if label_whitelist is None or label in label_whitelist: + if previous_start is not None and previous_label is not None: + start = previous_start + end = match.start() + if strip_whitespace: + start, end = strip_span(start=start, end=end, text=text) + if end - start == 0: + if verbose: + logger.warning( + f"Found empty partition in text at [{previous_start}:{match.start()}] " + f"with potential label: '{previous_label}'. It will be skipped." + ) + else: + span = LabeledSpan(start=start, end=end, label=previous_label) + yield span + + previous_start = match.start() + previous_label = label + + if previous_start is not None and previous_label is not None: + start = previous_start + end = len(text) + if strip_whitespace: + start, end = strip_span(start=start, end=end, text=text) + if end - start == 0: + if verbose: + logger.warning( + f"Found empty partition in text at [{previous_start}:{len(text)}] with potential label: " + f"'{previous_label}'. It will be skipped." + ) + else: + span = LabeledSpan(start=start, end=end, label=previous_label) + yield span + + +class RegexPartitioner(EnterDatasetMixin, ExitDatasetMixin): + """RegexPartitioner partitions a document into multiple partitions using a regular expression. + For more information, refer to get_partitions_with_matcher() method. + + :param pattern: A regular expression to search for in the text. It is also included at the beginning of each partition. + :param collect_statistics: A boolean value (default:False) that allows to collect relevant statistics of the + document after partitioning. When this parameter is enabled, following stats are + collected: + 1. partition_lengths: list of lengths of all partitions + 2. num_partitions: list of number of partitions in each document + 3. document_lengths: list of document lengths + show_statistics can be used to get statistical insight over these lists. + :param partitioner_kwargs: keyword arguments for get_partitions_with_matcher() method + """ + + def __init__( + self, + pattern: str, + collect_statistics: bool = False, + partition_layer_name: str = "partitions", + text_field_name: str = "text", + **partitioner_kwargs, + ): + self.matcher = create_regex_matcher(pattern) + self.partition_layer_name = partition_layer_name + self.text_field_name = text_field_name + self.collect_statistics = collect_statistics + self.reset_statistics() + self.partitioner_kwargs = partitioner_kwargs + + def reset_statistics(self): + self._statistics: dict[str, Any] = { + "partition_lengths": [], + "num_partitions": [], + "document_lengths": [], + } + + def show_statistics(self, description: str | None = None): + description = description or "Statistics" + statistics_show = { + key: { + "min": min(values), + "max": max(values), + "mean": statistics.mean(values), + "stddev": statistics.pstdev(values), + } + for key, values in self._statistics.items() + } + + logger.info(f"{description}: \n{json.dumps(statistics_show, indent=2)}") + + def update_statistics(self, key: str, value: int | str | list): + if self.collect_statistics: + if isinstance(value, list): + self._statistics[key] += value + elif isinstance(value, str) or isinstance(value, int): + self._statistics[key].append(value) + else: + raise TypeError( + f"type of given key [{type(key)}] or value [{type(value)}] is incorrect." + ) + + def __call__(self, document: D) -> D: + partition_lengths = [] + text: str = getattr(document, self.text_field_name) + for partition in _get_partitions_with_matcher( + text=text, matcher_or_pattern=self.matcher, **self.partitioner_kwargs + ): + document[self.partition_layer_name].append(partition) + partition_lengths.append(partition.end - partition.start) + + if self.collect_statistics: + self.update_statistics("num_partitions", len(document[self.partition_layer_name])) + self.update_statistics("partition_lengths", partition_lengths) + self.update_statistics("document_lengths", len(text)) + + return document + + def enter_dataset(self, dataset: Dataset | IterableDataset, name: str | None = None) -> None: + if self.collect_statistics: + self.reset_statistics() + + def exit_dataset(self, dataset: Dataset | IterableDataset, name: str | None = None) -> None: + if self.collect_statistics: + self.show_statistics(description=name) diff --git a/tests/unit/document/processing/test_regex_partitioner.py b/tests/unit/document/processing/test_regex_partitioner.py new file mode 100644 index 00000000..23f79779 --- /dev/null +++ b/tests/unit/document/processing/test_regex_partitioner.py @@ -0,0 +1,409 @@ +import dataclasses +import json +import logging +from typing import Tuple + +import pytest +from pytorch_ie.annotations import LabeledSpan +from pytorch_ie.core import AnnotationList, annotation_field +from pytorch_ie.documents import TextBasedDocument + +from pie_datasets.document.processing import RegexPartitioner +from pie_datasets.document.processing.regex_partitioner import ( + _get_partitions_with_matcher, +) + + +@dataclasses.dataclass +class TextDocumentWithPartitions(TextBasedDocument): + partitions: AnnotationList[LabeledSpan] = annotation_field(target="text") + + +def have_overlap(start_end: Tuple[int, int], other_start_end: Tuple[int, int]) -> bool: + other_start_overlaps = start_end[0] <= other_start_end[0] < start_end[1] + other_end_overlaps = start_end[0] < other_start_end[1] <= start_end[1] + start_overlaps_other = other_start_end[0] <= start_end[0] < other_start_end[1] + end_overlaps_other = other_start_end[0] < start_end[1] <= other_start_end[1] + return other_start_overlaps or other_end_overlaps or start_overlaps_other or end_overlaps_other + + +def test_regex_partitioner(): + TEXT1 = ( + "This is initial text.Jane lives in Berlin. this is no sentence about Karl." + "Seattle is a rainy city. Jenny Durkan is the city's mayor." + "Karl enjoys sunny days in Berlin." + ) + regex_partitioner = RegexPartitioner( + pattern="(||)", + ) + # The document contains a text separated by some markers like , and . RegexPartitioner + # partitions the text based on the given pattern. After partitioning, there are be four partitions with same label. + document = TextDocumentWithPartitions(text=TEXT1) + new_document = regex_partitioner(document) + + partitions = new_document.partitions + labels = [partition.label for partition in partitions] + assert len(partitions) == 4 + assert labels == ["partition"] * len(partitions) + assert str(partitions[0]) == "This is initial text." + assert str(partitions[1]) == "Jane lives in Berlin. this is no sentence about Karl." + assert ( + str(partitions[2]) == "Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[3]) == "Karl enjoys sunny days in Berlin." + + +def test_regex_partitioner_with_statistics(caplog): + TEXT1 = ( + "This is initial text.Jane lives in Berlin. this is no sentence about Karl." + "Seattle is a rainy city. Jenny Durkan is the city's mayor." + "Karl enjoys sunny days in Berlin." + ) + TEXT2 = "This is initial text.Lily is mother of Harry.Beth greets Emma." + + regex_partitioner = RegexPartitioner( + pattern="(||)", + label_group_id=0, + label_whitelist=["", "", ""], + skip_initial_partition=True, + collect_statistics=True, + ) + + # The document contains a text separated by some markers like , and . After partitioning, there + # are three partitions excluding initial part. Therefore, document length is not be equal to sum of partitions. + document = TextDocumentWithPartitions(text=TEXT1) + caplog.set_level(logging.INFO) + caplog.clear() + regex_partitioner.enter_dataset(None) + new_document = regex_partitioner(document) + regex_partitioner.exit_dataset(None) + partitions = new_document.partitions + assert len(partitions) == 3 + + assert len(caplog.records) == 1 + log_description, log_json = caplog.records[0].message.split("\n", maxsplit=1) + assert log_description.strip() == "Statistics:" + assert json.loads(log_json) == { + "partition_lengths": { + "min": 38, + "max": 66, + "mean": 54.666666666666664, + "stddev": 12.036980056845191, + }, + "num_partitions": {"min": 3, "max": 3, "mean": 3, "stddev": 0.0}, + "document_lengths": {"min": 185, "max": 185, "mean": 185, "stddev": 0.0}, + } + + # The document contains a text separated by some markers like and . RegexPartitioner appends statistics + # from each document, therefore statistics contains information from previous document as well. After partitioning, + # there are two partitions excluding initial part. Therefore, the sum of document lengths is not be equal to sum of + # partitions. + document = TextDocumentWithPartitions(text=TEXT2) + caplog.set_level(logging.INFO) + caplog.clear() + regex_partitioner.enter_dataset(None) + new_document = regex_partitioner(document) + regex_partitioner.exit_dataset(None) + partitions = new_document.partitions + assert len(partitions) == 2 + + assert len(caplog.records) == 1 + log_description, log_json = caplog.records[0].message.split("\n", maxsplit=1) + assert log_description.strip() == "Statistics:" + assert json.loads(log_json) == { + "partition_lengths": {"min": 22, "max": 31, "mean": 26.5, "stddev": 4.5}, + "num_partitions": {"min": 2, "max": 2, "mean": 2, "stddev": 0.0}, + "document_lengths": {"min": 74, "max": 74, "mean": 74, "stddev": 0.0}, + } + + with pytest.raises( + TypeError, + match=r"type of given key \[\] or value \[\] is incorrect.", + ): + regex_partitioner.update_statistics("num_partitions", 1.0) + + regex_partitioner.show_statistics() + + +@pytest.mark.parametrize("label_whitelist", [["", "", ""], [], None]) +@pytest.mark.parametrize("skip_initial_partition", [True, False]) +def test_regex_partitioner_without_label_group_id(label_whitelist, skip_initial_partition): + TEXT1 = ( + "This is initial text.Jane lives in Berlin. this is no sentence about Karl." + "Seattle is a rainy city. Jenny Durkan is the city's mayor." + "Karl enjoys sunny days in Berlin." + ) + regex_partitioner = RegexPartitioner( + pattern="(||)", + label_whitelist=label_whitelist, + skip_initial_partition=skip_initial_partition, + ) + # The document contains a text separated by some markers like , and . Since label_group_id is + # None, the partitions (if any) will have same label. + document = TextDocumentWithPartitions(text=TEXT1) + new_document = regex_partitioner(document) + partitions = new_document.partitions + assert [partition.label for partition in partitions] == ["partition"] * len(partitions) + if skip_initial_partition: + if label_whitelist == ["", "", ""] or label_whitelist == []: + # Since label_group_id is None, no label will be created using the matched pattern. Therefore, the default + # partition label is used but since it is not in label_whitelist, no partition is created. + assert len(partitions) == 0 + else: # label_whitelist is None + # since label_whitelist and label_group_id is None and skip_initial_partition is True, three partitions are + # created with the same label + assert len(partitions) == 3 + assert ( + str(partitions[0]) + == "Jane lives in Berlin. this is no sentence about Karl." + ) + assert ( + str(partitions[1]) + == "Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[2]) == "Karl enjoys sunny days in Berlin." + else: # skip_initial_partition is False + if label_whitelist == ["", "", ""] or label_whitelist == []: + # Since label_group_id is None, no label will be created using the matched pattern. Therefore, the default + # partition label is used but since it is not in label_whitelist, no partition is created. + assert len(partitions) == 0 + else: # label_whitelist is None + # since label_whitelist and label_group_id is None and skip_initial_partition is False, four partitions are + # created with the same label. + assert len(partitions) == 4 + assert str(partitions[0]) == "This is initial text." + assert ( + str(partitions[1]) + == "Jane lives in Berlin. this is no sentence about Karl." + ) + assert ( + str(partitions[2]) + == "Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[3]) == "Karl enjoys sunny days in Berlin." + + +@pytest.mark.parametrize( + "label_whitelist", [["partition", "", ""], ["", ""], [], None] +) +@pytest.mark.parametrize("skip_initial_partition", [True, False]) +def test_regex_partitioner_with_label_group_id(label_whitelist, skip_initial_partition): + TEXT1 = ( + "This is initial text.Jane lives in Berlin. this is no sentence about Karl." + "Seattle is a rainy city. Jenny Durkan is the city's mayor." + "Karl enjoys sunny days in Berlin." + ) + regex_partitioner = RegexPartitioner( + pattern="(||)", + label_group_id=0, + label_whitelist=label_whitelist, + skip_initial_partition=skip_initial_partition, + ) + # The document contains a text separated by some markers like , and . Possible partitions can + # be four including the initial partition. + document = TextDocumentWithPartitions(text=TEXT1) + new_document = regex_partitioner(document) + partitions = new_document.partitions + labels = [partition.label for partition in partitions] + if skip_initial_partition: + if label_whitelist == ["", ""] or label_whitelist == [ + "partition", + "", + "", + ]: + # Since skip_initial_partition is True, therefore even if initial_partition_label is in label_whitelist, it + # will not be added as a partition. + assert len(partitions) == 2 + assert labels == ["", ""] + assert ( + str(partitions[0]) + == "Jane lives in Berlin. this is no sentence about Karl.Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[1]) == "Karl enjoys sunny days in Berlin." + elif label_whitelist == []: + # Even though labels are created using label_group_id, label_whitelist is empty. Therefore, no partition will + # be created. + assert len(partitions) == 0 + else: # label_whitelist is None + # Since label_whitelist is None, all the labels formed using label_group_id will create a partition. + assert len(partitions) == 3 + assert labels == ["", "", ""] + assert ( + str(partitions[0]) + == "Jane lives in Berlin. this is no sentence about Karl." + ) + assert ( + str(partitions[1]) + == "Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[2]) == "Karl enjoys sunny days in Berlin." + else: # skip_initial_partition is False + if label_whitelist == ["", ""]: + # Though skip_initial_partition is False it is not in label_whitelist, therefore not added as a partition. + assert len(partitions) == 2 + assert labels == ["", ""] + assert ( + str(partitions[0]) + == "Jane lives in Berlin. this is no sentence about Karl.Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[1]) == "Karl enjoys sunny days in Berlin." + elif label_whitelist == ["partition", "", ""]: + # Since initial partition label is in label_whitelist, therefore it will form a partition in the document. + assert len(partitions) == 3 + assert labels == ["partition", "", ""] + assert str(partitions[0]) == "This is initial text." + assert ( + str(partitions[1]) + == "Jane lives in Berlin. this is no sentence about Karl.Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[2]) == "Karl enjoys sunny days in Berlin." + elif label_whitelist == []: + # Even though labels are created using label_group_id, label_whitelist is empty. Therefore, no partition will + # be created. + assert len(partitions) == 0 + else: # label_whitelist is None + # Since label_whitelist is None, all the labels formed using label_group_id will create a partition. In + # addition to that the initial partition will also be added to the document. + assert len(partitions) == 4 + assert labels == ["partition", "", "", ""] + assert str(partitions[0]) == "This is initial text." + assert ( + str(partitions[1]) + == "Jane lives in Berlin. this is no sentence about Karl." + ) + assert ( + str(partitions[2]) + == "Seattle is a rainy city. Jenny Durkan is the city's mayor." + ) + assert str(partitions[3]) == "Karl enjoys sunny days in Berlin." + + +@pytest.mark.parametrize("label_whitelist", [["partition"], [], None]) +@pytest.mark.parametrize("skip_initial_partition", [True, False]) +def test_regex_partitioner_with_no_match_found(skip_initial_partition, label_whitelist): + TEXT2 = "This is initial text.Lily is mother of Harry.Beth greets Emma." + regex_partitioner = RegexPartitioner( + pattern="()", + label_group_id=0, + label_whitelist=label_whitelist, + skip_initial_partition=skip_initial_partition, + ) + # The document contains a text separated by some markers like and . Only possible partition in the + # document based on the given pattern is the initial partition. + document = TextDocumentWithPartitions(text=TEXT2) + new_document = regex_partitioner(document) + + partitions = new_document.partitions + if skip_initial_partition: + # No matter what the value of label_whitelist is, there will be no partition created, since the given pattern + # is not in the document and skip_initial_partition is True. + if label_whitelist == ["partition"]: + assert len(partitions) == 0 + elif label_whitelist == []: + assert len(partitions) == 0 + else: # label_whitelist is None + assert len(partitions) == 0 + else: + if label_whitelist == ["partition"]: + # Since initial_partition_label is contained in label_whitelist, the initial partition will be added to the + # document. + assert len(partitions) == 1 + assert str(partitions[0]) == TEXT2 + assert partitions[0].label == "partition" + elif label_whitelist == []: + # Even though skip_initial_partition is False, initial_partition_label is not contained in label_whitelist. + # Therefore, the initial partition will not be added to the document. + assert len(partitions) == 0 + else: # label_whitelist is None + # Since label_whitelist is None and skip_initial_partition is False, the initial partition will be added to + # the document. + assert len(partitions) == 1 + assert str(partitions[0]) == TEXT2 + assert partitions[0].label == "partition" + + +def test_get_partitions_with_matcher(): + TEXT1 = ( + "This is initial text.Jane lives in Berlin. this is no sentence about Karl." + "Seattle is a rainy city. Jenny Durkan is the city's mayor." + "Karl enjoys sunny days in Berlin." + ) + # The document contains a text separated by some markers like , and . finditer method is used + # which returns non overlapping match from the text. Therefore, none of the partition created should have overlapped + # span and all of them should be instances of LabeledSpan. + document = TextDocumentWithPartitions(text=TEXT1) + partitions = [] + for partition in _get_partitions_with_matcher( + text=document.text, + matcher_or_pattern="(||)", + label_group_id=0, + label_whitelist=["", "", ""], + ): + assert isinstance(partition, LabeledSpan) + for p in partitions: + assert not have_overlap((p.start, p.end), (partition.start, partition.end)) + partitions.append(partition) + + +@pytest.mark.parametrize( + "strip_whitespace, verbose", + [ + (False, False), + (False, True), + (True, False), + (True, True), + ], +) +def test_regex_partitioner_with_strip_whitespace(strip_whitespace, verbose, caplog): + TEXT1 = ( + "\nThis is initial text. Jane lives in Berlin. this is no sentence about Karl.\n" + "Seattle is a rainy city. Jenny Durkan is the city's mayor.\n\n" + "Karl enjoys sunny days in Berlin.\n" + ) + regex_partitioner = RegexPartitioner( + pattern="\n", + strip_whitespace=strip_whitespace, + verbose=verbose, + ) + document = TextDocumentWithPartitions(text=TEXT1) + new_document = regex_partitioner(document) + + partitions = new_document.partitions + labels = [partition.label for partition in partitions] + if strip_whitespace: + assert len(partitions) == 3 + assert labels == ["partition"] * len(partitions) + assert ( + str(partitions[0]) + == "This is initial text. Jane lives in Berlin. this is no sentence about Karl." + ) + assert str(partitions[1]) == "Seattle is a rainy city. Jenny Durkan is the city's mayor." + assert str(partitions[2]) == "Karl enjoys sunny days in Berlin." + if verbose: + assert len(caplog.messages) == 3 + assert caplog.messages[0] == ( + "Found empty partition in text at [0:0] with potential label: 'partition'. It will be skipped." + ) + assert caplog.messages[1] == ( + "Found empty partition in text at [135:136] with potential label: 'partition'. It will be skipped." + ) + assert caplog.messages[2] == ( + "Found empty partition in text at [170:171] with potential label: 'partition'. It will be skipped." + ) + else: + assert len(partitions) == 5 + assert labels == ["partition"] * len(partitions) + assert ( + str(partitions[0]) + == "\nThis is initial text. Jane lives in Berlin. this is no sentence about Karl." + ) + assert str(partitions[1]) == "\nSeattle is a rainy city. Jenny Durkan is the city's mayor." + assert str(partitions[2]) == "\n" + assert str(partitions[3]) == "\nKarl enjoys sunny days in Berlin." + assert str(partitions[4]) == "\n" + if verbose: + assert len(caplog.messages) == 1 + assert ( + caplog.messages[0] + == "Found empty partition in text at [0:0] with potential label: 'partition'. It will be skipped." + )