-
Notifications
You must be signed in to change notification settings - Fork 0
/
30.clean
50 lines (45 loc) · 1.45 KB
/
30.clean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/bin/bash
source .env
source .checks
set -euo pipefail
module load parallel
IFS=" " read -ra args <<< $HQ_ENTRY
L=${args[0]}
INPUT_DIR=$WORKSPACE/collections_merged/$L
OUTPUT_DIR=$WORKSPACE/cleaned/$L
mkdir -p $OUTPUT_DIR
# Read the split output, compress it to temp, then move
compress-batch(){
local file=$1
# Remove 0s prefix
local name=$(echo $file | sed -E "s/\_0+/_/g")
# compress stdin, write to a temp
zstdmt -10 >$name.jsonl.zst.tmp
# remove temp suffix
mv $name.jsonl.zst.tmp $name.jsonl.zst
}
clean() {
jq -c '. | select(.filter == "keep" and .robotstxt == "allowed" and .doc_scores[0] >= 5)'
}
export -f compress-batch
export -f clean
process-file() {
local input_dir=$1
local output_dir=$2
local batch=`basename $3`
zstdcat $input_dir/$batch | clean | zstdmt -10 >$output_dir/$batch.tmp
mv $output_dir/$batch.tmp $output_dir/$batch
}
export -f process-file
# For the biggest languages, filter each file individually instead of in serial and re-batching
#if echo $L | grep -q 'eng_Latn\|rus_Cyrl\|zho_Hans'; then
if [[ $(ls -1 $INPUT_DIR/batch_*.jsonl.zst | wc -l) -gt 2 ]]; then
parallel -j48 process-file $INPUT_DIR $OUTPUT_DIR ::: `ls $INPUT_DIR/batch_*.jsonl.zst`
else
zstdcat $INPUT_DIR/batch_*.jsonl.zst \
| parallel -j32 --block 50M --pipe --halt now,fail=1 clean \
| split - \
--numeric-suffixes=1 -a 8 -C 120G \
--filter='compress-batch $FILE' \
$OUTPUT_DIR/batch_
fi