Skip to content

Commit

Permalink
add processing
Browse files Browse the repository at this point in the history
  • Loading branch information
veya2ztn committed Oct 9, 2024
1 parent 7f510b0 commit 9a89ff5
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 16 deletions.
9 changes: 6 additions & 3 deletions batch_running_task/batch_run.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

TOTALNUM=30
TOTALNUM=10
CPU_NUM=$1 # Automatically get the number of CPUs
if [ -z "$CPU_NUM" ]; then
CPU_NUM=$TOTALNUM
Expand All @@ -17,7 +17,10 @@ else
PARA="-p vip_gpu_ailab_low -N1 -c8 --gres=gpu:1"
fi
SCRIPT="batch_running_task/task_rec/run_rec.sh"
#SCRIPT="batch_running_task/task_layout/run_layout_for_missing_page.sh"

FILELIST="physics_collection/wait_for_ocr.filelist"
#FILELIST="physics_collection/analysis/not_complete_pdf_page_id.pairlist.filelist"


START=0
Expand All @@ -26,8 +29,8 @@ do

#sbatch --quotatype=spot -p AI4Chem -N1 -c8 --gres=gpu:1 run.sh sci_index_files.addon.filelist $(($CPU+$START)) $TOTALNUM
#sbatch --quotatype=spot -p AI4Chem -N1 -c8 --gres=gpu:1 run_mfr.sh physics_collection/sci_index_files.remain.filelist 0 1
sbatch $PARA $SCRIPT $FILELIST $(($CPU+$START)) $TOTALNUM
#sbatch --quotatype=spot -p AI4Chem -N1 -c8 --gres=gpu:1 physics_collection/sci_index_files.finished.filelist $(($CPU+$START)) $TOTALNUM
sbatch -x SH-IDC1-10-140-24-139 $PARA $SCRIPT $FILELIST $(($CPU+$START)) $TOTALNUM
#sbatch --quotatype=spot -p AI4Chem -N1 -c8 --gres=gpu:1 physics_collection/sci_index_files.finished.filelist $(($CPU+$START)) $TOTALNUM
#sbatch --quotatype=spot -p AI4Chem -N1 -c8 --gres=gpu:1 batch_running_task/task_layout/run_layout_for_missing_page.sh physics_collection/analysis/not_complete_pdf_page_id.pairlist.remain.filelist $(($CPU+$START)) $TOTALNUM
## lets sleep 20s every 10 job start
if [ $(($CPU % 10)) -eq 9 ]; then
Expand Down
66 changes: 66 additions & 0 deletions batch_running_task/get_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ def read_data_with_version(result_path, client):
#tqdm.write("reading result")
result = read_json_from_path(result_path,client)
result_dict = build_dict(result)
patch_version1_dict = {}
patch_version2_dict = {}
patch_version3_dict = {}
patch_version1_dict = build_dict(read_json_from_path(version1,client)) if check_path_exists(version1,client) else {}
patch_version2_dict = build_dict(read_json_from_path(version2,client)) if check_path_exists(version2,client) else {}
patch_version3_dict = build_dict(read_json_from_path(version3,client)) if check_path_exists(version3,client) else {}
Expand Down Expand Up @@ -576,6 +579,69 @@ def read_data_with_version(result_path, client):

return result

def read_data_with_missing(result_path, client):
if result_path.startswith("s3:"):
result_path = "opendata:"+result_path
#assert "layoutV" in result_path
filename = os.path.basename(result_path)
rootpath = os.path.dirname(os.path.dirname(result_path))
version1 = os.path.join(rootpath,"fix_missing_page_version3",filename)

assert check_path_exists(result_path,client)
#tqdm.write("reading result")
result = read_json_from_path(result_path,client)
result_dict = build_dict(result)
patch_version1_dict = {}
patch_version2_dict = {}
patch_version3_dict = {}
patch_version1_dict = build_dict(read_json_from_path(version1,client)) if check_path_exists(version1,client) else {}

#tqdm.write("reading done")
for track_id, pdf_metadata in result_dict.items():
for patch_dict in [patch_version1_dict, patch_version2_dict, patch_version3_dict]:
if track_id in patch_dict:
patch_pdf_metadata = patch_dict[track_id]
for page_id, pdf_page_metadata in patch_pdf_metadata.items():
if page_id in pdf_metadata:
assert len(pdf_page_metadata["layout_dets"]) == len(pdf_metadata[page_id]["layout_dets"]), f"pdf={track_id} page={page_id} => bbox count {len(pdf_metadata[page_id]['layout_dets'])} not equal to patch count {len(pdf_page_metadata['layout_dets'])}"
for box1_dict, box2_dict in zip(pdf_metadata[page_id]["layout_dets"], pdf_page_metadata["layout_dets"]):
assert box1_dict['category_id'] == box2_dict['category_id'], f"pdf={track_id} page={page_id} => category_id {box1_dict['category_id']} not equal to patch category_id {box2_dict['category_id']}"
assert box1_dict['poly'] == box2_dict['poly'], f"pdf={track_id} page={page_id} => poly {box1_dict['poly']} not equal to patch poly {box2_dict['poly']}"
if box1_dict['category_id'] == 15:
if box2_dict.get('text',"") == "":continue
if box1_dict.get('text',"") == "":
box1_dict['text'] = box2_dict.get('text',"")

else:
assert box1_dict['text'] == box2_dict['text'], f"pdf={track_id} page={page_id} => text {box1_dict['text']} not equal to patch text {box2_dict['text']}"

if box1_dict['category_id'] in {13, 14}:
if box2_dict.get('latex',"") == "":continue
if box1_dict.get('latex',"") == "":
box1_dict['latex'] = box2_dict['latex']
else:
assert box1_dict['latex'] == box2_dict['latex'], f"pdf={track_id} page={page_id} => latex {box1_dict['latex']} not equal to patch latex {box2_dict['latex']}"
box1_dict.update(box2_dict)
else:
pdf_metadata[page_id] = pdf_page_metadata

for pdf_metadata in result:
track_id = pdf_metadata['track_id']
pdf_metadata['height'] = output_height
pdf_metadata['width'] = output_width
doc_layout_result = []
for page_id, pdf_page_metadata in result_dict[track_id].items():
doc_layout_result.append(pdf_page_metadata)
pdf_metadata['doc_layout_result'] = doc_layout_result

#print(len(result))
# mfr_patch_dict = build_dict(read_json_from_path(mfr_patchpath,client)) if check_path_exists(mfr_patchpath,client) else {}
# mfr_patch_bf16_dict = build_dict(read_json_from_path(mfr_patch_bf16path,client)) if check_path_exists(mfr_patch_bf16path,client) else {}
# rec_patch_dict = build_dict(read_json_from_path(rec_patchpath,client)) if check_path_exists(rec_patchpath,client) else {}

return result


class PackStatus:
whole_layout_complete = 'whole_layout_complete'
whole_ocr_complete = 'whole_ocr_complete'
Expand Down
26 changes: 26 additions & 0 deletions batch_running_task/job_dispatch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
#SBATCH -J analysis
#SBATCH -o .log/analysis/%j-analysis.out
#SBATCH -e .log/analysis/%j-analysis.out
echo `hostname`
# dirpath=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 6) #6个随机字符
# mkdir "/dev/shm/$dirpath"
# ~/s3mount uparxive /dev/shm/$dirpath --profile hansen --max-threads 16 --maximum-throughput-gbps 25 --endpoint-url http://10.140.31.254:80 --prefix json/
declare -a pids

START=$2
SUBCHUNKSIZE=$3
CHUNKSIZE=$4
scipt_path=$5
for ((CPU=0; CPU<SUBCHUNKSIZE; CPU++));
do
TRUEINDICES=$(($CPU+$START))
nohup python $scipt_path --root $1 --index_part $TRUEINDICES --num_parts $CHUNKSIZE $6 > .log/convert/thread.$TRUEINDICES.log 2>&1 &
pids[$CPU]=$!
done

for pid in "${pids[@]}"; do
wait $pid
done

echo "All processes have completed."
2 changes: 1 addition & 1 deletion batch_running_task/scihub_pdf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def get_cropped_image_list_via_remove_mfd_part(self,pdf_metadata,client):
raise
except:
traceback.print_exc()
raise

tqdm.write(f"[Error]: {pdf_path}")
return (pdf_path,{})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@
dpi = model_configs['model_args']['pdf_dpi']


version = "fix_missing_page_version2"
version = "fix_missing_page_version3"
layout_model = None
mfd_model = None
client = None
ocrmodel = None
page_num_map_whole = None #get_page_num_map_whole()
for inputs_line in tqdm(all_file_list, leave=False, position=1):

splited_line = inputs_line.split()
splited_line = inputs_line.strip().split()
inputs_path = splited_line[0]
json_str = " ".join(splited_line[1:])

page_num_for_name = json.loads(json_str)

if os.path.exists(CURRENT_END_SIGN):
Expand Down
1 change: 1 addition & 0 deletions batch_running_task/task_layout/get_batch_yolo.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def get_batch_YOLO_model(model_configs, batch_size,use_tensorRT=True)->YOLO:
conf_thres= model_configs['model_args']['conf_thres']
iou_thres = model_configs['model_args']['iou_thres']
build_mfd_predictor(mfd_model , imgsz=img_size, conf=conf_thres, iou=iou_thres, verbose=False)
mfd_model.is_tensorRT=use_tensorRT
return mfd_model


4 changes: 3 additions & 1 deletion batch_running_task/task_layout/rough_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def inference_layout(layout_pair,layout_model,inner_batch_size):
def inference_mfd(mfd_images,mfd_model,inner_batch_size):
origin_length = len(mfd_images)

if len(mfd_images)<inner_batch_size:
if len(mfd_images)<inner_batch_size and mfd_model.is_tensorRT:
mfd_images = torch.nn.functional.pad(mfd_images, (0,0,0,0,0,0,0, inner_batch_size-len(mfd_images)))
mfd_res = mfd_model.predict(mfd_images, imgsz=(1888,1472), conf=0.3, iou=0.5, verbose=False)
mfd_res = mfd_res[:origin_length]
Expand Down Expand Up @@ -376,6 +376,8 @@ def fast_dealwith_one_dataset(dataset,layout_model, mfd_model, ocrmodel,
pdf_paths = [dataset.metadata[pdf_index]['path'] for pdf_index in pdf_index]
with timer('get_layout'):
layout_res = inference_layout((layout_images,heights, widths),layout_model,inner_batch_size)
print(layout_res)
raise
with timer('get_mfd'):
mfd_res = inference_mfd(mfd_images,mfd_model,inner_batch_size)
with timer('combine_layout_mfd_result'):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ else
echo "[`hostname`] GCC version is $REQUIRED_VERSION."
fi

python batch_running_task/task_layout/batch_deal_with_layout_fixmissing_page.py --root_path $1 --index_part $2 --num_parts $3 --inner_batch_size 16 --batch_size 16 --num_workers 8 --accelerated_mfd --shuffle # --accelerated_layout
python batch_running_task/task_layout/batch_deal_with_layout_fixmissing_page.py --root_path $1 --index_part $2 --num_parts $3 --inner_batch_size 16 --batch_size 16 --num_workers 8 --shuffle # --accelerated_layout --accelerated_mfd
2 changes: 1 addition & 1 deletion batch_running_task/task_rec/batch_deal_with_rec.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class BatchRECConfig(BatchModeConfig):
date_format = "%Y-%m-%d %H:%M:%S"
date = datetime.strptime(date_string, date_format)
deltatime = datetime.now() - date
if deltatime < timedelta(hours=20):
if deltatime < timedelta(hours=0.5):
tqdm.write(f"[Skip]: {filename_with_partion} is locked by {date_string} created at {last_start_time} [now is {deltatime}]")
continue

Expand Down
2 changes: 1 addition & 1 deletion batch_running_task/task_rec/run_rec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ else
fi


python batch_running_task/task_rec/batch_deal_with_rec.py --image_batch_size $IMAGE_BATCH_SIZE --pdf_batch_size $PDF_BATCH_SIZE --root_path $1 --index_part $2 --num_parts $3 --num_workers 8 --update_origin --replace --shuffle #--compile
python batch_running_task/task_rec/batch_deal_with_rec.py --image_batch_size $IMAGE_BATCH_SIZE --pdf_batch_size $PDF_BATCH_SIZE --root_path $1 --index_part $2 --num_parts $3 --num_workers 8 --update_origin --replace #--shuffle #--compile
7 changes: 5 additions & 2 deletions batch_running_task/task_schedule.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@

#!/bin/bash
TASKLIMIT=30
PENDINGLIMIT=2

# Function to get the count of pending tasks
user=`whoami`
if [[ $(hostname) == SH* ]]; then
partition='AI4Chem'
TASKLIMIT=80
PENDINGLIMIT=2
else
partition='vip_gpu_ailab_low'
TASKLIMIT=30
PENDINGLIMIT=2
fi
# jobscript="batch_running_task/task_layout/run_layout_for_missing_page.sh"
# filelist='scihub_collection/analysis/not_complete_pdf_page_id.pairlist.filelist'
Expand Down
150 changes: 150 additions & 0 deletions script/format_into_markdown/format_into_markdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from magic_pdf.pipe.UNIPipe import UNIPipe
import json
from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
import sys,os
sys.path.append(os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
from batch_running_task.get_data_utils import *
from batch_running_task.batch_run_utils import obtain_processed_filelist, process_files,save_analysis, BatchModeConfig,dataclass
import json
from tqdm.auto import tqdm
from simple_parsing import ArgumentParser
import time
import subprocess
client = build_client()
from batch_running_task.utils import convert_boxes
# set logging level to Error
from loguru import logger

# Remove the default logger
logger.remove()

# Add a new logger with the desired level
logger.add(sys.stderr, level="ERROR")

@dataclass
class FormatIntoMarkdownConfig(BatchModeConfig):
savepath: str = "opendata:s3://llm-pdf-text/pdf_gpu_output/scihub_shared/physics_part/markdown"
saveimageQ: bool = False


def reformat_to_minerU_input(pool):
"""
our directly formated input is a little different from the format that minerU required.
we need convert
MinerU input format:
{
....
"doc_layout_result": [
{
"layout_dets": [...],
"page_info": {}
]
}
"""
height = pool["height"]
width = pool["width"]
old_doc_layout_result = pool['doc_layout_result']
new_doc_layout_result = []
for page_res_information in old_doc_layout_result:
page_num = page_res_information['page_id']

old_page_layout_dets = page_res_information['layout_dets']
new_page_layout_dets = []

for bbox_information in old_page_layout_dets:
if bbox_information.get('category_id',"")!=15:
new_page_layout_dets.append(bbox_information)
elif "sub_boxes" not in bbox_information :
if "text" not in bbox_information:
bbox_information['text'] = "[Missing]"
bbox_information['score']= 1
new_page_layout_dets.append(bbox_information)
elif len(bbox_information['sub_boxes']) == 0:
new_bbox_information = {
"category_id": bbox_information['category_id'],
"poly": bbox_information['poly'],
"text": bbox_information['text'],
"score": bbox_information['score']
}
new_page_layout_dets.append(new_bbox_information)
else:
current_bbox_catagory = bbox_information['category_id']
for sub_bbox_information in bbox_information['sub_boxes']:
new_bbox_information = {
"category_id": current_bbox_catagory,
"poly": sub_bbox_information['poly'],
"text": sub_bbox_information['text'],
"score": sub_bbox_information['score']
}
new_page_layout_dets.append(new_bbox_information)

new_page = {
"layout_dets": new_page_layout_dets,
"page_info": {"page_no": page_num, "height": height, "width": width}
}
new_doc_layout_result.append(new_page)
return new_doc_layout_result

def process_file(jsonl_path, args):
filename = os.path.basename(jsonl_path)
saveroot = os.path.dirname(os.path.dirname(jsonl_path))
targetpath= os.path.join(saveroot,"markdown",filename)
image_save_root = os.path.join(saveroot,"images_per_pdf",filename[:-len(".jsonl")])
if not args.redo and check_path_exists(targetpath,client):
tqdm.write(f"skip {targetpath}")
return
jsonl_data = read_json_from_path(jsonl_path,client)
markdown_for_this_bulk = []
for pdf_info in tqdm(jsonl_data,desc=f"process {filename}",leave=False,position=1):
try:
model_list = reformat_to_minerU_input(pdf_info) #pdf_info['doc_layout_result']
track_id = pdf_info['track_id']
img_save_dir = os.path.join(image_save_root, track_id)
if img_save_dir.startswith("opendata:"):
img_save_dir = img_save_dir[len("opendata:"):]
# if img_save_dir.startswith("s3://"):
# img_save_dir = img_save_dir[len("s3://"):]
image_writer = S3ReaderWriter(ak="ZSLIM2EYKENEX5B4AYBI",
sk="2ke199F35V9Orwcu8XJyGUcaJzeDz4LzvMP5yEFD",
endpoint_url='http://p-ceph-norm-outside.pjlab.org.cn',
parent_path=img_save_dir)
pdf_path = pdf_info["path"]
if pdf_path.startswith("s3:"):
pdf_path = "opendata:"+pdf_path
pdf_bytes = client.get(pdf_path)#read_pdf_from_path(pdf_path,client)
pip = UNIPipe(pdf_bytes, {"_pdf_type":"", "model_list":model_list}, image_writer=image_writer)
pip.pdf_type = pip.PIP_OCR
pip.pipe_parse()
md = pip.pipe_mk_markdown(img_save_dir,drop_mode="none")
except:
md = "Fail to Parser"

markdown_for_this_bulk.append({"track_id":track_id, "path":pdf_path, "markdown":md})
write_jsonl_to_path(markdown_for_this_bulk,targetpath,client)


def process_one_file_wrapper(args):
arxiv_path, args = args
return process_file(arxiv_path,args)

if __name__ == '__main__':


parser = ArgumentParser()
parser.add_arguments(BatchModeConfig, dest="config")
args = parser.parse_args()
args = args.config
args.task_name = "scan"
alread_processing_file_list = obtain_processed_filelist(args)
results = process_files(process_one_file_wrapper, alread_processing_file_list, args)









2 changes: 1 addition & 1 deletion script/merge_all_patch_back/merge_all_patch_back.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def process_file(result_path, args):
tqdm.write(f"skip {target_file_path}")
return
#target_file_path = "test.jsonl"
result = read_data_with_version(result_path,client)
result = read_data_with_missing(result_path,client)
tqdm.write(f"read {result_path} to {target_file_path}")

write_jsonl_to_path(result,target_file_path ,client)
Expand Down
Loading

0 comments on commit 9a89ff5

Please sign in to comment.