From af7bb2635bd1b8e7667b91b702641ea928cc7e44 Mon Sep 17 00:00:00 2001 From: mieskolainen Date: Sat, 19 Oct 2024 19:11:38 +0000 Subject: [PATCH] deploy: 2032d84984ab3d868a95a5ad7508e5944fc40d67 --- _modules/icenet/tools/process.html | 338 +++++++++++++++-------------- genindex.html | 14 +- modules/icenet.html | 39 ++-- objects.inv | Bin 6404 -> 6419 bytes searchindex.js | 2 +- 5 files changed, 203 insertions(+), 190 deletions(-) diff --git a/_modules/icenet/tools/process.html b/_modules/icenet/tools/process.html index cbf00d51..d818b616 100644 --- a/_modules/icenet/tools/process.html +++ b/_modules/icenet/tools/process.html @@ -761,7 +761,7 @@

Source code for icenet.tools.process

     hash_args['maxevents'] = args['maxevents']
     hash_args['inputvars'] = args['inputvars']
     hash_args.update(inputmap)
-
+    
     # Finally create the hash
     args['__hash_genesis__'] = io.make_hash_sha256_object(hash_args)
     
@@ -950,15 +950,15 @@ 

Source code for icenet.tools.process

             
             icelogger.set_global_log_file(f'{args["datadir"]}/genesis_{args["__hash_genesis__"]}.log')
             print(cli) # for output log
-            read_data(args=args, func_loader=func_loader, runmode=runmode) 
+            process_raw_data(args=args, func_loader=func_loader) 
             
         if runmode == 'train' or runmode == 'eval':
             
             icelogger.set_global_log_file(f'{args["plotdir"]}/{runmode}/execution.log')
             print(cli) # for output log
-            data = read_data_processed(args=args, func_loader=func_loader,
+            data = train_eval_data_processor(args=args,
                     func_factor=func_factor, mvavars=f'configs.{rootname}.mvavars', runmode=runmode)
-        
+            
         if runmode == 'train':
 
             output_file = f'{args["plotdir"]}/train/stats_train.log'
@@ -1042,16 +1042,12 @@ 

Source code for icenet.tools.process

 
 # -------------------------------------------------------------------
 
-
-[docs] +
+[docs] @iceprint.icelog(LOGGER) -def read_data(args, func_loader, runmode): +def process_raw_data(args, func_loader): """ - Load input data and return full dataset arrays - - Args: - args: main argument dictionary - func_loader: application specific root file loader function + Load raw input from the disk -- this is executed only by 'genesis' """ num_cpus = args['num_cpus'] @@ -1062,170 +1058,181 @@

Source code for icenet.tools.process

 
     cache_directory = aux.makedir(f'{args["datadir"]}/data_{args["__hash_genesis__"]}')
 
-    if args['__use_cache__'] == False or (not os.path.exists(f'{cache_directory}/output_0.pkl')):
+    # Check do we have already computed pickles ready
+    if (os.path.exists(f'{cache_directory}/output_0.pkl') and args['__use_cache__']):
+        print(f'Found existing pickle data under: {cache_directory} and --use_cache 1. [done] ', 'green')
+        return
+    
+    # func_loader does the multifile processing
+    load_args = {'entry_start': 0, 'entry_stop': None, 'maxevents': args['maxevents'], 'args': args}
+    data      = func_loader(root_path=args['root_files'], **load_args)
+    
+    X    = data['X']
+    Y    = data['Y']
+    W    = data['W']
+    ids  = data['ids']
+    info = data['info']
 
-        if runmode != "genesis":
-            if args['__use_cache__'] == False:
-                raise Exception(__name__ + f'.read_data: [--use_cache 0] is to be used only with [--runmode genesis]')
+    C = get_chunk_ind(N=len(X))
+    
+    print(f'Saving {len(C)} pickle files to path: "{cache_directory}"', 'yellow')
 
-            raise Exception(__name__ + f'.read_data: Data "{cache_directory}" not found [execute --runmode genesis and set --maxevents N]')
-        
-        # func_loader does the multifile processing
-        load_args = {'entry_start': 0, 'entry_stop': None, 'maxevents': args['maxevents'], 'args': args}
-        data   = func_loader(root_path=args['root_files'], **load_args)
-        
-        X    = data['X']
-        Y    = data['Y']
-        W    = data['W']
-        ids  = data['ids']
-        info = data['info']
+    ## New code
+    
+    def save_pickle(i):
+        with open(f'{cache_directory}/output_{i}.pkl', 'wb') as handle:
+            pickle.dump([X[C[i][0]:C[i][-1]], Y[C[i][0]:C[i][-1]], W[C[i][0]:C[i][-1]], ids, info, args], 
+                        handle, protocol=pickle.HIGHEST_PROTOCOL)
 
-        C = get_chunk_ind(N=len(X))
-        
-        print(f'Saving {len(C)} pickle files to path: "{cache_directory}"', 'yellow')
+    tic = time.time()
+    
+    # Create a thread pool
+    max_workers = multiprocessing.cpu_count() // 2 if num_cpus == 0 else num_cpus
+    max_workers = min(len(C), max_workers)
+    
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        # Map the save_pickle function to each index in the range
+        list(tqdm(executor.map(save_pickle, range(len(C))), total=len(C)))
 
-        ## New code
-        
-        def save_pickle(i):
-            with open(f'{cache_directory}/output_{i}.pkl', 'wb') as handle:
-                pickle.dump([X[C[i][0]:C[i][-1]], Y[C[i][0]:C[i][-1]], W[C[i][0]:C[i][-1]], ids, info, args], 
-                            handle, protocol=pickle.HIGHEST_PROTOCOL)
+    toc = time.time() - tic
+    print(f'Saving took {toc:0.2f} sec')
+    
+    # Save args
+    aux.yaml_dump(data=args, filename=f'{args["datadir"]}/data_{args["__hash_genesis__"]}.yml')
+    
+    """
+    # OLD code
+    
+    tic = time.time()
+    
+    for i in tqdm(range(len(C))):
+        with open(f'{cache_directory}/output_{i}.pkl', 'wb') as handle:
+            pickle.dump([X[C[i][0]:C[i][-1]], Y[C[i][0]:C[i][-1]], W[C[i][0]:C[i][-1]], ids, info, args], \
+                handle, protocol=pickle.HIGHEST_PROTOCOL)
+    
+    toc = time.time() - tic
+    print(f'Saving took {toc:0.2f} sec')
+    """
+    
+    gc.collect()
+    io.showmem()
+    
+    print('[done]')
+    
+    return
- tic = time.time() - - # Create a thread pool - max_workers = multiprocessing.cpu_count() // 2 if num_cpus == 0 else num_cpus - max_workers = min(len(C), max_workers) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Map the save_pickle function to each index in the range - list(tqdm(executor.map(save_pickle, range(len(C))), total=len(C))) - toc = time.time() - tic - print(f'Saving took {toc:0.2f} sec') - - # Save args - aux.yaml_dump(data=args, filename=f'{args["datadir"]}/data_{args["__hash_genesis__"]}.yml') - - """ - # OLD code - - tic = time.time() - - for i in tqdm(range(len(C))): - with open(f'{cache_directory}/output_{i}.pkl', 'wb') as handle: - pickle.dump([X[C[i][0]:C[i][-1]], Y[C[i][0]:C[i][-1]], W[C[i][0]:C[i][-1]], ids, info, args], \ - handle, protocol=pickle.HIGHEST_PROTOCOL) - - toc = time.time() - tic - print(f'Saving took {toc:0.2f} sec') - """ - - gc.collect() - io.showmem() - - print('[done]') - - return data - - else: - - if runmode == "genesis": # Genesis mode does not need this - print(f'"genesis" already done and the cache files are ready.', 'green') - return - - ## New version - - """ - Using ThreadPool, not fully parallel because of GIL (Global Interpreter Lock), but - should keep memory in control (vs. ProcessPool uses processes, but memory can be a problem) - """ - - files = os.listdir(cache_directory) - sorted_files = sorted(files, key=lambda x: int(os.path.splitext(x)[0].split('_')[1])) - - filepaths = [os.path.join(cache_directory, f) for f in sorted_files] - num_files = len(filepaths) - - print(f'Loading {num_files} pickle files from path: "{cache_directory}"') - print('') - print(sorted_files) - print('') - - data = [None] * num_files - - tic = time.time() - - max_workers = multiprocessing.cpu_count() // 2 if num_cpus == 0 else num_cpus - max_workers = min(num_files, max_workers) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_index = {executor.submit(load_file_wrapper, i, fp): i for i, fp in enumerate(filepaths)} - for future in tqdm(as_completed(future_to_index), total=num_files): - try: - index, (X_, Y_, W_, ids, info, genesis_args) = future.result() - data[index] = (X_, Y_, W_) - except Exception as e: - msg = f'Error loading file: {filepaths[future_to_index[future]]} -- {e}' - raise Exception(msg) - - finally: - del future # Ensure the future is deleted to free memory - - toc = time.time() - tic - print(f'Loading took {toc:0.2f} sec') - - X, Y, W = concatenate_data(data=data, max_batch_size=args['tech']['concat_max_pickle']) - gc.collect() # Call garbage collection once after the loop - - """ - ## Old version - - num_files = io.count_files_in_dir(cache_directory) - print(f'Loading from path: "{cache_directory}"', 'yellow') +
+[docs] +@iceprint.icelog(LOGGER) +def combine_pickle_data(args): + """ + Load splitted pickle data and return full dataset arrays + + Args: + args: main argument dictionary + """ + + num_cpus = args['num_cpus'] + + cache_directory = aux.makedir(f'{args["datadir"]}/data_{args["__hash_genesis__"]}') + + if (not os.path.exists(f'{cache_directory}/output_0.pkl')): + raise Exception(__name__ + f'.process_pickle_data: No genesis stage pickle data under "{cache_directory}" [execute --runmode genesis and set --maxevents N]') + + ## New version + + """ + Using ThreadPool, not fully parallel because of GIL (Global Interpreter Lock), but + should keep memory in control (vs. ProcessPool uses processes, but memory can be a problem) + """ + + files = os.listdir(cache_directory) + sorted_files = sorted(files, key=lambda x: int(os.path.splitext(x)[0].split('_')[1])) + + filepaths = [os.path.join(cache_directory, f) for f in sorted_files] + num_files = len(filepaths) + + print(f'Loading {num_files} pickle files from path: "{cache_directory}"') + print('') + print(sorted_files) + print('') + + data = [None] * num_files + + tic = time.time() + + max_workers = multiprocessing.cpu_count() // 2 if num_cpus == 0 else num_cpus + max_workers = min(num_files, max_workers) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_index = {executor.submit(load_file_wrapper, i, fp): i for i, fp in enumerate(filepaths)} + for future in tqdm(as_completed(future_to_index), total=num_files): + try: + index, (X_, Y_, W_, ids, info, genesis_args) = future.result() + data[index] = (X_, Y_, W_) + except Exception as e: + msg = f'Error loading file: {filepaths[future_to_index[future]]} -- {e}' + raise Exception(msg) + + finally: + del future # Ensure the future is deleted to free memory + + toc = time.time() - tic + print(f'Loading took {toc:0.2f} sec') + + X, Y, W = concatenate_data(data=data, max_batch_size=args['tech']['concat_max_pickle']) + gc.collect() # Call garbage collection once after the loop + + """ + ## Old version + + num_files = io.count_files_in_dir(cache_directory) + print(f'Loading from path: "{cache_directory}"', 'yellow') + + tic = time.time() + for i in tqdm(range(num_files)): - tic = time.time() - for i in tqdm(range(num_files)): + with open(f'{cache_directory}/output_{i}.pkl', 'rb') as handle: + X_, Y_, W_, ids, info, genesis_args = pickle.load(handle) + + if i > 0: + X = np.concatenate((X, X_), axis=0) # awkward will cast numpy automatically + Y = np.concatenate((Y, Y_), axis=0) + W = np.concatenate((W, W_), axis=0) + else: + X,Y,W = copy.deepcopy(X_), copy.deepcopy(Y_), copy.deepcopy(W_) - with open(f'{cache_directory}/output_{i}.pkl', 'rb') as handle: - X_, Y_, W_, ids, info, genesis_args = pickle.load(handle) - - if i > 0: - X = np.concatenate((X, X_), axis=0) # awkward will cast numpy automatically - Y = np.concatenate((Y, Y_), axis=0) - W = np.concatenate((W, W_), axis=0) - else: - X,Y,W = copy.deepcopy(X_), copy.deepcopy(Y_), copy.deepcopy(W_) - - gc.collect() # important! - toc = time.time() - tic - print(f'Took {toc:0.2f} sec') - """ - - io.showmem() - print('[done]') - - return {'X':X, 'Y':Y, 'W':W, 'ids':ids, 'info':info}
+ gc.collect() # important! + toc = time.time() - tic + print(f'Took {toc:0.2f} sec') + """ + + io.showmem() + print('[done]') + + return {'X':X, 'Y':Y, 'W':W, 'ids':ids, 'info':info}
+ -
-[docs] +
+[docs] @iceprint.icelog(LOGGER) -def read_data_processed(args, func_loader, func_factor, mvavars, runmode): +def train_eval_data_processor(args, func_factor, mvavars, runmode): """ Read/write (MVA) data and return full processed dataset """ # -------------------------------------------------------------------- - # 'DATA': Raw input reading and processing + # 1. Pickle data combiner step - cache_filename = f'{args["datadir"]}/data_{runmode}_{args["__hash_genesis__"]}.pkl' + cache_filename = f'{args["datadir"]}/data_{args["__hash_genesis__"]}.pkl' if args['__use_cache__'] == False or (not os.path.exists(cache_filename)): print(f'File "{cache_filename}" for <DATA> does not exist, creating.', 'yellow') - data = read_data(args=args, func_loader=func_loader, runmode=runmode) + data = combine_pickle_data(args=args) with open(cache_filename, 'wb') as handle: print(f'Saving <DATA> to a pickle file: "{cache_filename}"', 'yellow') @@ -1258,16 +1265,16 @@

Source code for icenet.tools.process

     print('[done]')
     
     # --------------------------------------------------------------------
-    # 'PROCESSED DATA': Further processing step
+    # 2. High level data step
     
-    cache_filename = f'{args["datadir"]}/processed_data_{runmode}_{args["__hash_post_genesis__"]}.pkl'
+    cache_filename = f'{args["datadir"]}/processed_data_{runmode}_{args["__hash_genesis__"]}__{args["__hash_post_genesis__"]}.pkl'
     
     if args['__use_cache__'] == False or (not os.path.exists(cache_filename)):
         
         print(f'File "{cache_filename}" for <PROCESSED DATA> does not exist, creating.', 'yellow')
         
         # Process it
-        processed_data = process_data(args=args, predata=data, func_factor=func_factor, mvavars=mvavars, runmode=runmode)
+        processed_data = process_data(args=args, data=data, func_factor=func_factor, mvavars=mvavars, runmode=runmode)
         
         with open(cache_filename, 'wb') as handle:
             print(f'Saving <PROCESSED DATA> to a pickle file: "{cache_filename}"', 'yellow')
@@ -1303,19 +1310,20 @@ 

Source code for icenet.tools.process

     return processed_data
+
[docs] @iceprint.icelog(LOGGER) -def process_data(args, predata, func_factor, mvavars, runmode): +def process_data(args, data, func_factor, mvavars, runmode): """ - Process data further + Process data to high level representations and split to train/eval/test """ - - X = predata['X'] - Y = predata['Y'] - W = predata['W'] - ids = predata['ids'] - info = predata['info'] + + X = data['X'] + Y = data['Y'] + W = data['W'] + ids = data['ids'] + info = data['info'] # ---------------------------------------------------------- # Pop out conditional variables if they exist and no conditional training is used diff --git a/genindex.html b/genindex.html index ae778390..53cd3933 100644 --- a/genindex.html +++ b/genindex.html @@ -733,6 +733,8 @@

C

  • colors() (in module iceplot.iceplot)
  • columnar_mask_efficiency() (in module icefit.statstools) +
  • +
  • combine_pickle_data() (in module icenet.tools.process)
  • compute_edge_attr() (in module icehgcal.graphio)
  • @@ -2382,6 +2384,8 @@

    P

  • process_memory_use() (in module icenet.tools.io) +
  • +
  • process_raw_data() (in module icenet.tools.process)
  • process_regexp_ids() (in module icenet.tools.aux)
  • @@ -2454,10 +2458,6 @@

    R

  • read_cli() (in module icenet.tools.process)
  • read_config() (in module icenet.tools.process) -
  • -
  • read_data() (in module icenet.tools.process) -
  • -
  • read_data_processed() (in module icenet.tools.process)
  • read_data_tracklet() (in module icehgcal.common)
  • @@ -2473,10 +2473,10 @@

    R

  • reduce_mantissa_digits() (in module icefit.mantissa)
  • - - +
  • train_cutset() (in module icenet.deep.train) +
  • +
  • train_eval_data_processor() (in module icenet.tools.process)
  • train_flow() (in module icenet.deep.train)
  • diff --git a/modules/icenet.html b/modules/icenet.html index 5152c634..de6593ff 100644 --- a/modules/icenet.html +++ b/modules/icenet.html @@ -4731,6 +4731,17 @@

    icenet.tools

    icenet.tools.process¶

    +
    +
    +combine_pickle_data(args)[source]¶
    +

    Load splitted pickle data and return full dataset arrays

    +
    +
    Parameters:
    +

    args – main argument dictionary

    +
    +
    +
    +
    concatenate_data(data, max_batch_size: int = 32)[source]¶
    @@ -4811,8 +4822,14 @@

    icenet.tools
    -process_data(args, predata, func_factor, mvavars, runmode)[source]¶
    -

    Process data further

    +process_data(args, data, func_factor, mvavars, runmode)[source]¶ +

    Process data to high level representations and split to train/eval/test

    +

    + +
    +
    +process_raw_data(args, func_loader)[source]¶
    +

    Load raw input from the disk – this is executed only by ‘genesis’

    @@ -4827,22 +4844,8 @@

    icenet.tools -
    -read_data(args, func_loader, runmode)[source]¶
    -

    Load input data and return full dataset arrays

    -
    -
    Parameters:
    -
      -
    • args – main argument dictionary

    • -
    • func_loader – application specific root file loader function

    • -
    -
    -
    -

    - -
    -
    -read_data_processed(args, func_loader, func_factor, mvavars, runmode)[source]¶
    +
    +train_eval_data_processor(args, func_factor, mvavars, runmode)[source]¶

    Read/write (MVA) data and return full processed dataset

    diff --git a/objects.inv b/objects.inv index c37ab3342baed663456972e27d58e0e393e15e74..7e9010860140307492e5e29c801279c5e36ca3b9 100644 GIT binary patch delta 1060 zcmV+<1l#+BGLtfpJ_9o@FtI_!6o1DpALwKW?A921zj24#ZT;Y~6!y#eZ5y70YWIXQ za&+Phde>eBR*>Qv9qbvi9FZ0lJGOoIfl7%@#kW8I>eu_sVELYjv1EZ--;JBA`t{}D zt8!U!O;c_(pJA(&UUM*=*8m;R*r#njw~h~`X97HUgLuusJ|Meub`4&de}6maK;z?( zOy(ikX6mfPL0X}F?sVOGe=u z@A?OAJ0F=hf$H-iq(ANnyH-tENf(b@;^Wj)Ov4RixAweV~jG~%${iE9e)Fen90a-$4F;s z79{28v=?G!bUhmkGbnKz%RwG}4l3dhQGuU+rgMzFvaJ1ktd@?id-<9q*9F6@N!3^2R~qp3`Y9j*}Qwq`4&)$S%LL^zq2Bm{(`Qh*e{ZujSaN z{Au7u)#GCsrx9gNR<}kyQ16@;;+Y!?a;Z7iA001SKv z6uudqaUN$moZo_sO4Y}q;V11y(D}XRP;f~RM8=1$A;$(!?0;hAPLCIhjxQ$|2Xwnf zSqxmWgA4k20Ic69)jz?lvBzH8qlf$AFAlV^J7F3p#iZ-%;+4o770O|OF2L(ygpRl*Tj{H_Mbpc+d*RNjGTSruVmG-)Q z^J>4uACYx+!Rz$;)qZY2BI?S)>-73nWufl>TV8tqI8+QiPCviA`TY6oYiRMBy47gB zP|&26CO=Du-*yFR_eqZk=`6h?h~2jxV)sdlD5bk^>Yv5w^S0E**RKYp@kBAb+k%T! e?Uz=sBo!Xhu_sKQCC%sb{heH^SN{f?6*`9NHz76v delta 1045 zcmV+w1nT>fGK4aaJ_9l?HL*d(6o22_t3maqJC?$ZbidESb5QLjZblYNoI!WktH26U zTvUTyS(fL}!eURf_xGukShs)s^RIq^&J33CnHWnJsCBluxhgIk%8lkTY_-yBHfEOi z#C~V{32jYvhfD4PJ>Bte4B}M;`+)4u*)@1l{_UhghmS)tnTIs$I(25@Ab+h;K6kqA zJTw`gja=fWI|o{~t;tNZ*v>Uo&_)!tA`EOE!KT-uH|RXUjpH3OT1R5}*b&RxzsG9X_?rXO>`;qS7mvCKBjXi2%prZg!CjY&Zl=J< z~i-Kjzm{kLnXYY=YG2IaiY3y%w zvTYnR?m3;-;?RguMSq(AVS()OJ4+vr^@@3QCX84$#`s!}jhvqbZd5%!bZ{C`=45qi z)C2X-X(67Ap&*x3eZR+@u6b>_O<3PL;!_y!a}K+c*2V`=he^ zf$DxJDt_d}5fy&b%ka%mx6N1R>G0dGKClpfsK+rgvL#k*fXD>XpQ!H1>q)v!wZ) PzQ2=e^#