Skip to content

Commit

Permalink
Dev/manage meta (#518)
Browse files Browse the repository at this point in the history
* - add insight mining

* meta tags aggregator

* naive reverse grouper

* * resolve the bugs when running insight mining in multiprocessing mode

* * update unittests

* * update unittests

* * update unittests

* tags specified field

* * update readme for analyzer

* doc done

* * use more detailed key

* + add reference

* move mm tags

* move meta key

* done

* test done

* rm nested set

* Update constant.py

minor fix

* rename agg to batch meta

* export in naive reverse grouper

---------

Co-authored-by: null <[email protected]>
Co-authored-by: gece.gc <[email protected]>
Co-authored-by: lielin.hyl <[email protected]>
Co-authored-by: Daoyuan Chen <[email protected]>
  • Loading branch information
5 people authored Jan 3, 2025
1 parent 1fe821f commit fb98c56
Show file tree
Hide file tree
Showing 54 changed files with 1,230 additions and 627 deletions.
74 changes: 46 additions & 28 deletions configs/config_all.yaml

Large diffs are not rendered by default.

39 changes: 23 additions & 16 deletions data_juicer/ops/aggregator/entity_attribute_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from data_juicer.ops.base_op import OPERATORS, Aggregator
from data_juicer.utils.common_utils import (avg_split_string_list_under_limit,
is_string_list, nested_access,
nested_set)
is_string_list)
from data_juicer.utils.constant import BatchMetaKeys, Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

from .nested_aggregator import NestedAggregator
Expand Down Expand Up @@ -53,8 +53,8 @@ def __init__(self,
api_model: str = 'gpt-4o',
entity: str = None,
attribute: str = None,
input_key: str = None,
output_key: str = None,
input_key: str = MetaKeys.event_description,
output_key: str = BatchMetaKeys.entity_attribute,
word_limit: PositiveInt = 100,
max_token_num: Optional[PositiveInt] = None,
*,
Expand All @@ -73,12 +73,10 @@ def __init__(self,
:param api_model: API model name.
:param entity: The given entity.
:param attribute: The given attribute.
:param input_key: The input field key in the samples. Support for
nested keys such as "__dj__stats__.text_len". It is text_key
in default.
:param output_key: The output field key in the samples. Support for
nested keys such as "__dj__stats__.text_len". It is same as the
input_key in default.
:param input_key: The input key in the meta field of the samples.
It is "event_description" in default.
:param output_key: The output key in the aggregation field of the
samples. It is "entity_attribute" in default.
:param word_limit: Prompt the output length.
:param max_token_num: The max token num of the total tokens of the
sub documents. Without limitation if it is None.
Expand All @@ -103,8 +101,8 @@ def __init__(self,

self.entity = entity
self.attribute = attribute
self.input_key = input_key or self.text_key
self.output_key = output_key or self.input_key
self.input_key = input_key
self.output_key = output_key
self.word_limit = word_limit
self.max_token_num = max_token_num

Expand All @@ -131,7 +129,7 @@ def __init__(self,
**model_params)

self.try_num = try_num
self.nested_sum = NestedAggregator(model=api_model,
self.nested_sum = NestedAggregator(api_model=api_model,
max_token_num=max_token_num,
api_endpoint=api_endpoint,
response_path=response_path,
Expand Down Expand Up @@ -185,12 +183,21 @@ def attribute_summary(self, sub_docs, rank=None):

def process_single(self, sample=None, rank=None):

if self.output_key in sample[Fields.batch_meta]:
return sample

if Fields.meta not in sample or self.input_key not in sample[
Fields.meta][0]:
logger.warning('The input key does not exist in the sample!')
return sample

sub_docs = [d[self.input_key] for d in sample[Fields.meta]]
# if not batched sample
sub_docs = nested_access(sample, self.input_key)
if not is_string_list(sub_docs):
logger.warning('Require string meta as input!')
return sample

sample = nested_set(sample, self.output_key,
self.attribute_summary(sub_docs, rank=rank))
sample[Fields.batch_meta][self.output_key] = self.attribute_summary(
sub_docs, rank=rank)

return sample
39 changes: 23 additions & 16 deletions data_juicer/ops/aggregator/most_relavant_entities_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from pydantic import PositiveInt

from data_juicer.ops.base_op import OPERATORS, Aggregator
from data_juicer.utils.common_utils import (is_string_list, nested_access,
nested_set)
from data_juicer.utils.common_utils import is_string_list
from data_juicer.utils.constant import BatchMetaKeys, Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

from ..common import split_text_by_punctuation
Expand Down Expand Up @@ -44,8 +44,8 @@ def __init__(self,
api_model: str = 'gpt-4o',
entity: str = None,
query_entity_type: str = None,
input_key: str = None,
output_key: str = None,
input_key: str = MetaKeys.event_description,
output_key: str = BatchMetaKeys.most_relavant_entities,
max_token_num: Optional[PositiveInt] = None,
*,
api_endpoint: Optional[str] = None,
Expand All @@ -62,12 +62,10 @@ def __init__(self,
:param api_model: API model name.
:param entity: The given entity.
:param query_entity_type: The type of queried relavant entities.
:param input_key: The input field key in the samples. Support for
nested keys such as "__dj__stats__.text_len". It is text_key
in default.
:param output_key: The output field key in the samples. Support for
nested keys such as "__dj__stats__.text_len". It is same as the
input_key in default.
:param input_key: The input key in the meta field of the samples.
It is "event_description" in default.
:param output_key: The output key in the aggregation field of the
samples. It is "most_relavant_entities" in default.
:param max_token_num: The max token num of the total tokens of the
sub documents. Without limitation if it is None.
:param api_endpoint: URL endpoint for the API.
Expand All @@ -91,8 +89,8 @@ def __init__(self,

self.entity = entity
self.query_entity_type = query_entity_type
self.input_key = input_key or self.text_key
self.output_key = output_key or self.input_key
self.input_key = input_key
self.output_key = output_key
self.max_token_num = max_token_num

system_prompt_template = system_prompt_template or \
Expand Down Expand Up @@ -167,13 +165,22 @@ def query_most_relavant_entities(self, sub_docs, rank=None):

def process_single(self, sample=None, rank=None):

if self.output_key in sample[Fields.batch_meta]:
return sample

if Fields.meta not in sample or self.input_key not in sample[
Fields.meta][0]:
logger.warning('The input key does not exist in the sample!')
return sample

sub_docs = [d[self.input_key] for d in sample[Fields.meta]]

# if not batched sample
sub_docs = nested_access(sample, self.input_key)
if not is_string_list(sub_docs):
return sample

sample = nested_set(
sample, self.output_key,
self.query_most_relavant_entities(sub_docs, rank=rank))
sample[Fields.batch_meta][
self.output_key] = self.query_most_relavant_entities(sub_docs,
rank=rank)

return sample
29 changes: 19 additions & 10 deletions data_juicer/ops/aggregator/nested_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from data_juicer.ops.base_op import OPERATORS, Aggregator
from data_juicer.utils.common_utils import (avg_split_string_list_under_limit,
is_string_list, nested_access)
is_string_list)
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

OP_NAME = 'nested_aggregator'
Expand Down Expand Up @@ -47,7 +48,7 @@ class NestedAggregator(Aggregator):

def __init__(self,
api_model: str = 'gpt-4o',
input_key: str = None,
input_key: str = MetaKeys.event_description,
output_key: str = None,
max_token_num: Optional[PositiveInt] = None,
*,
Expand All @@ -63,12 +64,10 @@ def __init__(self,
"""
Initialization method.
:param api_model: API model name.
:param input_key: The input field key in the samples. Support for
nested keys such as "__dj__stats__.text_len". It is text_key
in default.
:param output_key: The output field key in the samples. Support for
nested keys such as "__dj__stats__.text_len". It is same as the
input_key in default.
:param input_key: The input key in the meta field of the samples.
It is "event_description" in default.
:param output_key: The output key in the aggregation field in the
samples. It is same as the input_key in default.
:param max_token_num: The max token num of the total tokens of the
sub documents. Without limitation if it is None.
:param api_endpoint: URL endpoint for the API.
Expand Down Expand Up @@ -165,11 +164,21 @@ def recursive_summary(self, sub_docs, rank=None):

def process_single(self, sample=None, rank=None):

if self.output_key in sample[Fields.batch_meta]:
return sample

if Fields.meta not in sample or self.input_key not in sample[
Fields.meta][0]:
logger.warning('The input key does not exist in the sample!')
return sample

sub_docs = [d[self.input_key] for d in sample[Fields.meta]]

# if not batched sample
sub_docs = nested_access(sample, self.input_key)
if not is_string_list(sub_docs):
return sample

sample[self.output_key] = self.recursive_summary(sub_docs, rank=rank)
sample[Fields.batch_meta][self.output_key] = self.recursive_summary(
sub_docs, rank=rank)

return sample
11 changes: 11 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,17 @@ def process_single(self, sample):

def run(self, dataset, *, exporter=None, tracer=None):
dataset = super(Aggregator, self).run(dataset)
# add batched meta field for OPs that produce aggregations
if Fields.batch_meta not in dataset.features:
from data_juicer.core.data import add_same_content_to_new_column
dataset = dataset.map(add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.batch_meta,
'initial_value': {}
},
num_proc=self.runtime_np(),
batch_size=self.batch_size,
desc='Adding new column for aggregation')
new_dataset = dataset.map(
self.process,
num_proc=self.runtime_np(),
Expand Down
8 changes: 4 additions & 4 deletions data_juicer/ops/filter/video_tagging_from_frames_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
from pydantic import PositiveInt

from data_juicer.utils.constant import Fields
from data_juicer.utils.constant import Fields, MetaKeys

from ..base_op import (NON_STATS_FILTERS, OPERATORS, TAGGING_OPS, UNFORKABLE,
Filter)
Expand All @@ -30,7 +30,7 @@ def __init__(self,
contain: str = 'any',
frame_sampling_method: str = 'all_keyframes',
frame_num: PositiveInt = 3,
tag_field_name: str = Fields.video_frame_tags,
tag_field_name: str = MetaKeys.video_frame_tags,
any_or_all: str = 'any',
*args,
**kwargs):
Expand All @@ -55,8 +55,8 @@ def __init__(self,
the first and the last frames will be extracted. If it's larger
than 2, in addition to the first and the last frames, other frames
will be extracted uniformly within the video duration.
:param tag_field_name: the field name to store the tags. It's
"__dj__video_frame_tags__" in default.
:param tag_field_name: the key name to store the tags in the meta
field. It's "video_frame_tags" in default.
:param any_or_all: keep this sample with 'any' or 'all' strategy of
all videos. 'any': keep this sample if any videos meet the
condition. 'all': keep this sample only if all videos meet the
Expand Down
24 changes: 23 additions & 1 deletion data_juicer/ops/grouper/naive_reverse_grouper.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
import json
import os

from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import create_directory_if_not_exists

from ..base_op import OPERATORS, Grouper, convert_dict_list_to_list_dict


@OPERATORS.register_module('naive_reverse_grouper')
class NaiveReverseGrouper(Grouper):
"""Split batched samples to samples. """

def __init__(self, *args, **kwargs):
def __init__(self, batch_meta_export_path=None, *args, **kwargs):
"""
Initialization method.
:param batch_meta_export_path: the path to export the batch meta.
Just drop the batch meta if it is None.
:param args: extra args
:param kwargs: extra args
"""
super().__init__(*args, **kwargs)
self.batch_meta_export_path = batch_meta_export_path

def process(self, dataset):

if len(dataset) == 0:
return dataset

samples = []
batch_metas = []
for sample in dataset:
if Fields.batch_meta in sample:
batch_metas.append(sample[Fields.batch_meta])
sample = {
k: sample[k]
for k in sample if k != Fields.batch_meta
}
samples.extend(convert_dict_list_to_list_dict(sample))
if self.batch_meta_export_path is not None:
create_directory_if_not_exists(
os.path.dirname(self.batch_meta_export_path))
with open(self.batch_meta_export_path, 'w') as f:
for batch_meta in batch_metas:
f.write(json.dumps(batch_meta, ensure_ascii=False) + '\n')

return samples
28 changes: 19 additions & 9 deletions data_juicer/ops/mapper/dialog_intent_detection_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@
from loguru import logger
from pydantic import NonNegativeInt, PositiveInt

from data_juicer.ops.base_op import OPERATORS, Mapper
from data_juicer.utils.common_utils import nested_set
from data_juicer.ops.base_op import OPERATORS, TAGGING_OPS, Mapper
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

OP_NAME = 'dialog_intent_detection_mapper'


# TODO: LLM-based inference.
@TAGGING_OPS.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
class DialogIntentDetectionMapper(Mapper):
"""
Mapper to generate user's intent labels in dialog. Input from
history_key, query_key and response_key. Output lists of
labels and analysis for queries in the dialog, which is
store in 'dialog_intent_labels' and
'dialog_intent_labels_analysis' in Data-Juicer meta field.
labels and analysis for queries in the dialog.
"""

DEFAULT_SYSTEM_PROMPT = (
Expand Down Expand Up @@ -60,6 +58,8 @@ def __init__(self,
intent_candidates: Optional[List[str]] = None,
max_round: NonNegativeInt = 10,
*,
labels_key: str = MetaKeys.dialog_intent_labels,
analysis_key: str = MetaKeys.dialog_intent_labels_analysis,
api_endpoint: Optional[str] = None,
response_path: Optional[str] = None,
system_prompt: Optional[str] = None,
Expand All @@ -82,6 +82,11 @@ def __init__(self,
intent labels of the open domain if it is None.
:param max_round: The max num of round in the dialog to build the
prompt.
:param labels_key: The key name in the meta field to store the
output labels. It is 'dialog_intent_labels' in default.
:param analysis_key: The key name in the meta field to store the
corresponding analysis. It is 'dialog_intent_labels_analysis'
in default.
:param api_endpoint: URL endpoint for the API.
:param response_path: Path to extract content from the API response.
Defaults to 'choices.0.message.content'.
Expand Down Expand Up @@ -111,6 +116,8 @@ def __init__(self,

self.intent_candidates = intent_candidates
self.max_round = max_round
self.labels_key = labels_key
self.analysis_key = analysis_key

self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT
self.query_template = query_template or self.DEFAULT_QUERY_TEMPLATE
Expand Down Expand Up @@ -167,6 +174,11 @@ def parse_output(self, response):
return analysis, labels

def process_single(self, sample, rank=None):

meta = sample[Fields.meta]
if self.labels_key in meta and self.analysis_key in meta:
return sample

client = get_model(self.model_key, rank=rank)

analysis_list = []
Expand Down Expand Up @@ -208,9 +220,7 @@ def process_single(self, sample, rank=None):
history.append(self.labels_template.format(labels=labels))
history.append(self.response_template.format(response=qa[1]))

analysis_key = f'{Fields.meta}.{MetaKeys.dialog_intent_labels_analysis}' # noqa: E501
sample = nested_set(sample, analysis_key, analysis_list)
labels_key = f'{Fields.meta}.{MetaKeys.dialog_intent_labels}'
sample = nested_set(sample, labels_key, labels_list)
meta[self.labels_key] = labels_list
meta[self.analysis_key] = analysis_list

return sample
Loading

0 comments on commit fb98c56

Please sign in to comment.