-
Notifications
You must be signed in to change notification settings - Fork 1
/
eQTL_build_parquet.py
59 lines (54 loc) · 1.22 KB
/
eQTL_build_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import dask.dataframe as dd
import time
from datetime import timedelta
import sys
out_dir = "data/eqtls"
subset_cols=[
"Pvalue",
"SNP",
"SNPChr",
"SNPPos",
"AssessedAllele",
"OtherAllele",
"Zscore",
"Gene",
"GeneChr",
"GenePos",
"NrCohorts",
"NrSamples",
"FDR",
"BonferroniP",
]
eqtl_sumstats = sys.argv[1]
start_time = time.monotonic()
df = dd.read_csv(
eqtl_sumstats,
sep="\t",
usecols=lambda col: col in set(subset_cols),
dtype={
"Pvalue": "float64",
"SNP": "str",
"SNPChr": "int32",
"SNPPos": "int64",
"AssessedAllele": "str",
"OtherAllele": "str",
"Zscore": "float64",
"Gene": "str",
"GeneChr": "str",
"GenePos": "int64",
"NrCohorts": "int64",
"NrSamples": "int64",
"FDR": "float64",
"BonferroniP": "float64",
},
)
df.repartition(partition_size="1024MB").to_parquet(
out_dir,
partition_on=["SNPChr"],
engine="pyarrow",
write_index=False,
compression="snappy",
append=True
)
end_time = time.monotonic()
print(f"Time taken: {timedelta(seconds=end_time - start_time)}")