diff --git a/_modules/data_juicer/core/analyzer.html b/_modules/data_juicer/core/analyzer.html index d4c984cd6..c20baa1ba 100644 --- a/_modules/data_juicer/core/analyzer.html +++ b/_modules/data_juicer/core/analyzer.html @@ -115,7 +115,6 @@

Source code for data_juicer.core.analyzer

         self.cfg = init_configs() if cfg is None else cfg
 
         self.work_dir = self.cfg.work_dir
-        self.ops = None
 
         if self.cfg.use_cache:
             logger.info(f'Using cache compression method: '
@@ -163,13 +162,12 @@ 

Source code for data_juicer.core.analyzer

 
         # extract processes
         logger.info('Preparing process operators...')
-        self.cfg.process, self.ops = load_ops(self.cfg.process,
-                                              self.cfg.op_fusion)
+        ops = load_ops(self.cfg.process, self.cfg.op_fusion)
 
         # 2. stats precompute only for filter ops
         logger.info('Computing the stats of dataset...')
         stats_collected = False
-        for op in self.ops:
+        for op in ops:
             if isinstance(op, Filter):
                 original_process = op.process
                 op.process = None
diff --git a/_modules/data_juicer/core/data.html b/_modules/data_juicer/core/data.html
index 39c748ebf..70f40fc29 100644
--- a/_modules/data_juicer/core/data.html
+++ b/_modules/data_juicer/core/data.html
@@ -86,6 +86,7 @@ 

Source code for data_juicer.core.data

 
 import copy
 import inspect
+import traceback
 from abc import ABC, abstractmethod
 from functools import wraps
 from time import time
@@ -258,24 +259,32 @@ 

Source code for data_juicer.core.data

         unforkable_operators = set(UNFORKABLE.modules.keys())
 
         dataset = self
-        for op in operators:
-            mp_context = ['forkserver', 'spawn'] if (
-                op.use_cuda() or op._name in unforkable_operators) else None
-            setup_mp(mp_context)
-
-            start = time()
-            # run single op
-            dataset = op(dataset,
-                         exporter=exporter,
-                         checkpointer=checkpointer,
-                         tracer=tracer)
-            # record processed ops
-            if checkpointer is not None:
-                checkpointer.record(op._name,
-                                    list(op._process_kwargs.values())[0])
-            end = time()
-            logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
-                        f'Left {len(dataset)} samples.')
+        try:
+            for op in operators:
+                mp_context = ['forkserver', 'spawn'] if (
+                    op.use_cuda()
+                    or op._name in unforkable_operators) else None
+                setup_mp(mp_context)
+
+                start = time()
+                # run single op
+                dataset = op.run(dataset, exporter=exporter, tracer=tracer)
+                # record processed ops
+                if checkpointer is not None:
+                    checkpointer.record(op._of_cfg)
+                end = time()
+                logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
+                            f'Left {len(dataset)} samples.')
+        except:  # noqa: E722
+            logger.error(f'An error occurred during Op [{op._name}].')
+            traceback.print_exc()
+            exit(1)
+        finally:
+            if checkpointer:
+                logger.info('Writing checkpoint of dataset processed by '
+                            'last op...')
+                dataset.cleanup_cache_files()
+                checkpointer.save_ckpt(dataset)
         return dataset
[docs] def map(self, *args, **kargs): diff --git a/_modules/data_juicer/core/executor.html b/_modules/data_juicer/core/executor.html index b20696fc4..e4092fef5 100644 --- a/_modules/data_juicer/core/executor.html +++ b/_modules/data_juicer/core/executor.html @@ -83,7 +83,6 @@

Source code for data_juicer.core.executor

 import os
-import traceback
 from time import time
 
 from loguru import logger
@@ -122,7 +121,6 @@ 

Source code for data_juicer.core.executor

 
         self.work_dir = self.cfg.work_dir
 
-        self.ops = None
         self.tracer = None
         self.ckpt_manager = None
 
@@ -142,17 +140,15 @@ 

Source code for data_juicer.core.executor

         # check if there are existing checkpoints first and try to load the
         # checkpoints. If the checkpoints are loaded successfully, ops that
         # have been processed will be skipped.
-        self.process_list = self.cfg.process
         if self.cfg.use_checkpoint:
             logger.info('Preparing checkpoint manager...')
             self.ckpt_dir = os.path.join(self.work_dir, 'ckpt')
             self.ckpt_manager = CheckpointManager(self.ckpt_dir,
-                                                  self.process_list,
+                                                  self.cfg.process,
                                                   self.cfg.np)
             if self.ckpt_manager.ckpt_available:
                 logger.info('Found existed dataset checkpoint.')
-                self.process_list = self.ckpt_manager.get_left_process_list()
-        self.cfg.process = self.process_list
+                self.cfg.process = self.ckpt_manager.get_left_process_list()
 
         # prepare exporter and check export path suffix
         logger.info('Preparing exporter...')
@@ -239,15 +235,14 @@ 

Source code for data_juicer.core.executor

 
         # 2. extract processes
         logger.info('Preparing process operators...')
-        self.process_list, self.ops = load_ops(self.cfg.process,
-                                               self.cfg.op_fusion)
+        ops = load_ops(self.cfg.process, self.cfg.op_fusion)
 
         # 3. data process
         # - If tracer is open, trace each op after it's processed
         # - If checkpoint is open, clean the cache files after each process
         logger.info('Processing data...')
         tstart = time()
-        dataset = dataset.process(self.ops,
+        dataset = dataset.process(ops,
                                   exporter=self.exporter,
                                   checkpointer=self.ckpt_manager,
                                   tracer=self.tracer)
@@ -256,17 +251,7 @@ 

Source code for data_juicer.core.executor

 
         # 4. data export
         logger.info('Exporting dataset to disk...')
-        try:
-            self.exporter.export(dataset)
-        except:  # noqa: E722
-            logger.error('An error occurred during exporting the processed '
-                         'dataset.')
-            traceback.print_exc()
-            if self.cfg.use_checkpoint:
-                logger.info('Writing checkpoint of dataset processed by '
-                            'last op...')
-                dataset.cleanup_cache_files()
-                self.ckpt_manager.save_ckpt(dataset)
+        self.exporter.export(dataset)
         # compress the last dataset after exporting
         if self.cfg.use_cache and self.cfg.cache_compress:
             from data_juicer.utils.compress import compress
diff --git a/_modules/data_juicer/ops/base_op.html b/_modules/data_juicer/ops/base_op.html
index 86b88ec8c..dfeb8346a 100644
--- a/_modules/data_juicer/ops/base_op.html
+++ b/_modules/data_juicer/ops/base_op.html
@@ -241,27 +241,6 @@ 

Source code for data_juicer.ops.base_op

                 method = wrap_func_with_nested_access(method)
                 setattr(self, name, method)
 
-    def __call__(self,
-                 dataset,
-                 *,
-                 exporter=None,
-                 checkpointer=None,
-                 tracer=None):
-        try:
-            dataset = self.run(dataset, exporter=exporter, tracer=tracer)
-            if checkpointer:
-                checkpointer.record(self._name, self._process_kwargs)
-            return dataset
-        except:  # noqa: E722
-            logger.error(f'An error occurred during Op [{self._name}].')
-            traceback.print_exc()
-            if checkpointer:
-                logger.info('Writing checkpoint of dataset processed by '
-                            'last op...')
-                dataset.cleanup_cache_files()
-                checkpointer.save_ckpt(dataset)
-            exit(1)
-
     @classmethod
     def is_batched_op(cls):
         return cls._batched_op
diff --git a/_modules/data_juicer/ops/load.html b/_modules/data_juicer/ops/load.html
index 02ff5d156..1688ba1c6 100644
--- a/_modules/data_juicer/ops/load.html
+++ b/_modules/data_juicer/ops/load.html
@@ -116,10 +116,10 @@ 

Source code for data_juicer.ops.load

     if op_fusion:
         new_process_list, ops = fuse_operators(new_process_list, ops)
 
-    for process, op in zip(new_process_list, ops):
-        op._process_kwargs = process
+    for op_cfg, op in zip(new_process_list, ops):
+        op._op_cfg = op_cfg
 
-    return new_process_list, ops
+ return ops