-
Notifications
You must be signed in to change notification settings - Fork 0
/
pl_inference_spark.py
150 lines (118 loc) · 5.51 KB
/
pl_inference_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import logging
import torch
import pytorch_lightning as pl
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from dltranz.util import get_conf, get_cls
logger = logging.getLogger(__name__)
class InferenceSpark(object):
def __init__(self, spark, work_path, model_path, output_path, dataset_files,
col_id, pl_module_class, hidden_size, batch_size):
self.spark = spark
self.work_path = work_path
self.model_path = model_path
self.output_path = output_path
self.dataset_files = dataset_files
self.col_id = col_id
self.pl_module_class = pl_module_class
self.hidden_size = hidden_size
self.batch_size = batch_size
def collect_batches(self):
df_t = self.spark.read.parquet(*[f"{self.work_path}/{i}" for i in self.dataset_files])
columns = [i for i in df_t.columns if i not in [self.col_id, "trx_count", "target"]]
df_t\
.orderBy("trx_count")\
.withColumn("group_id",F.monotonically_increasing_id())\
.groupby(F.floor(F.col("group_id")/self.batch_size).alias("group_id"))\
.agg(
F.collect_list(self.col_id).alias(self.col_id),
F.struct([F.collect_list(F.col(name)).alias(name) for name in columns]).alias("feature_arrays"),
F.collect_list("trx_count").alias("trx_count"))\
.write.format("parquet").mode("overwrite")\
.save(f"{self.work_path}/data/tmp_batch_train")
logger.info(f'broken into batches no more than {self.batch_size} and save path {self.work_path}/data/tmp_batch_train')
def exec_inference(self):
pl.seed_everything(42)
pl_module = get_cls(self.pl_module_class)
model = pl_module.load_from_checkpoint(f"{self.work_path}/{self.model_path}")
model.seq_encoder.is_reduce_sequence = True
br_m = self.spark.sparkContext.broadcast(model.seq_encoder)
def inference_func(data_feature, data_length):
import torch
from dltranz.trx_encoder import PaddedBatch
import subprocess
if torch.cuda.is_available():
result = subprocess.check_output(
['nvidia-smi', '--query-gpu=memory.free',
'--format=csv,nounits,noheader'
], encoding='utf-8')
gpu_memory = [int(x) for x in result.strip().split('\n')]
num_cuda = gpu_memory.index(max(gpu_memory))
device = torch.device(f"cuda:{num_cuda}")
else:
device = torch.device("cpu")
data_obj = PaddedBatch(
payload={
k: torch.nn.utils.rnn.pad_sequence(
[torch.tensor(x) for x in v], batch_first=True
).to(device) for k, v in data_feature.asDict().items()
},
length=torch.IntTensor(data_length).to(device)
)
br_m.value.to(device)
br_m.value.eval()
with torch.no_grad():
outputs = br_m.value(data_obj)
torch.cuda.empty_cache()
return outputs.cpu().numpy().tolist()
inference_func_udf = F.udf(inference_func, T.ArrayType(T.ArrayType(T.FloatType())))
self.spark.read.parquet(f"{self.work_path}/data/tmp_batch_train")\
.repartition(100)\
.select(
F.col(self.col_id),
inference_func_udf(
F.col("feature_arrays"),
F.col("trx_count")
).alias("inf_res")
)\
.write.format("parquet").mode("overwrite")\
.save(f"{self.work_path}/data/tmp_res_emb")
logger.info(f'distribute to cuda and make inferences and save path {self.work_path}/data/tmp_res_emb')
def explode_embedd(self):
self.spark.read.parquet(f"{self.work_path}/data/tmp_res_emb")\
.withColumn("cols_zip",F.arrays_zip(self.col_id,"inf_res"))\
.withColumn("cols_explode",F.explode("cols_zip"))\
.select(F.col("cols_explode")[self.col_id].alias(self.col_id),
*[F.col("cols_explode")["inf_res"][i].alias(f"v{i}") for i in range(self.hidden_size)])\
.write.format("parquet").mode("overwrite")\
.save(f"{self.work_path}/{self.output_path}")
logger.info(f'explode arrays and save path {self.work_path}/{self.output_path}')
def main(args=None):
conf = get_conf(args)
spark = SparkSession.builder\
.appName("spark_inference")\
.master(f"local[{conf.inference_dataloader.loader.num_workers}]")\
.config("spark.sql.shuffle.partitions",100)\
.config("spark.driver.memory",conf['spark_memory'])\
.config("spark.local.dir",f"{conf.work_path}/spark_local_dir")\
.enableHiveSupport()\
.getOrCreate()
inference_obj = InferenceSpark(spark,
conf['work_path'],
conf['model_path'],
conf['output.path'],
conf['inference_dataloader.dataset_files'],
conf['inference_dataloader.col_id'],
conf['params.pl_module_class'],
conf['params.rnn.hidden_size'],
conf["inference_dataloader.loader.batch_size"]
)
inference_obj.collect_batches()
inference_obj.exec_inference()
inference_obj.explode_embedd()
spark.stop()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-7s %(funcName)-20s : %(message)s')
logging.getLogger("lightning").setLevel(logging.INFO)
main()