Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code Changes to Support Criteo_Dense for Sparse v. Dense Experiments #70

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion model-inference/decisionTree/experiments/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"num_trees": 1600,
"num_trees": 500,
"depth": 8,
"pgsqlconfig": {
"host": "localhost",
Expand Down Expand Up @@ -174,5 +174,21 @@
"filename": "",
"table": "",
"header": false
},
"criteo_dense": {
"num_features": 1000000,
"rows": 86,
"batch_size": 100000,
"query_size": 100000,
"type": "classification",
"info": "https://www.kaggle.com/c/criteo-display-ad-challenge/",
"query": "SELECT * from criteo_dense",
"create": "",
"train": 0,
"test": 1,
"y_col": "label",
"filename": "criteo_dense",
"table": "criteo_dense",
"header": true
}
}
79 changes: 74 additions & 5 deletions model-inference/decisionTree/experiments/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import gc
import json
import pickle
from model_helper import relative2abspath, dataset_folder
from model_helper import relative2abspath, dataset_folder, todense_fill
import numpy as np
import pandas as pd
from urllib.request import urlretrieve
Expand All @@ -29,8 +29,9 @@ def parse_arguments():
'bosch',
'covtype',
'criteo',
'tpcxai_fraud'],
help="Dataset to be processed. Choose from ['higgs', 'airline_regression', 'airline_classification', 'fraud', 'year', 'epsilon', 'bosch', 'covtype','tpcxai_fraud','criteo']")
'tpcxai_fraud',
'criteo_dense'],
help="Dataset to be processed. Choose from ['higgs', 'airline_regression', 'airline_classification', 'fraud', 'year', 'epsilon', 'bosch', 'covtype','tpcxai_fraud','criteo', 'criteo_dense']")
parser.add_argument("-n", "--nrows", type=int, help="Load nrows of the dataset. Warning: only use in development.")
parser.add_argument("-sf","--scalefactor", type=int, help="Relevant only for TPCxAI_Fraud. Takes one of the values in 1, 3, 10 and 30")

Expand Down Expand Up @@ -244,6 +245,24 @@ def prepare_criteo(dataset_folder):
if not (os.path.isfile(train_path) and os.path.isfile(test_path)):
os.system(f"tar -Jxf {local_url} -C {dataset_folder}")

def prepare_criteo_dense(dataset_folder):
prepare_criteo(dataset_folder)
read_lines = 1e4 # 5e4 # 5e6 # 6042135
num_features = 1000000
test_path = relative2abspath(dataset_folder, "criteo.kaggle2014.svm", "test.txt.svm")
if os.path.isfile(test_path):
test_df_features, test_df_labels = datasets.load_svmlight_file(test_path, n_features=num_features, length=read_lines)
# print(test_df_features.todense(), test_df_labels[..., np.newaxis])
# print(np.append(test_df_features.todense(), test_df_labels[..., np.newaxis], axis=1).shape)
test_df = pd.DataFrame(np.append(todense_fill(test_df_features, fill_value=-1), test_df_labels[..., np.newaxis], axis=1), columns=[f'feature_{idx}' for idx in range(num_features)]+['label'])
print('Test Dataset Shape:',test_df.shape)
# print(test_df.shape)
# test_df = test_df.join(pd.DataFrame(test_df_labels, columns=['label']))
# print(test_df[['feature_1']].describe())
return test_df
return None


def get_connection(pgsqlconfig):
return psycopg2.connect(
database=pgsqlconfig["dbname"],
Expand Down Expand Up @@ -276,6 +295,10 @@ def make_query(dataset, datasetconfig, column_names):
feature_names = ", ".join([f"{col_name} DECIMAL NOT NULL" for col_name in column_names])
label_name = f"{datasetconfig['y_col']} INTEGER NOT NULL"
create_query = f"CREATE TABLE ** ({feature_names}, {label_name})"
elif dataset == "criteo_dense":
feature_names = '''"row" double precision[]'''
label_name = f"{datasetconfig['y_col']} INTEGER NOT NULL"
create_query = f"CREATE TABLE ** ({label_name}, {feature_names})"
else:
create_query = datasetconfig["create"]
train_create_query = create_query.replace(
Expand Down Expand Up @@ -387,11 +410,11 @@ def create_tables(
print('-'*50)
df = pd.concat([df,prepare_tpcxai_fraud_transactions(dataset_folder, nrows=partition_size, skip_rows=range(1,partition_size*i))])
print(f'Final Shape of DataFrame: {df.shape}')

elif dataset == 'criteo':
prepare_criteo(dataset_folder)
exit()

elif dataset == 'criteo_dense':
df = prepare_criteo_dense(dataset_folder)
else:
raise ValueError(f"{dataset} not supported")

Expand Down Expand Up @@ -450,6 +473,51 @@ def create_tables(

exit()

# CRITEO_DENSE FOLLOWS DIFFERENT LOADING INSTRUCTIONS AS
# IT HAS MORE THAN 1600 COLUMNS
if dataset == "criteo_dense":
train = pd.DataFrame({'label': []})
column_names = list(df.columns)
connection = get_connection(pgsqlconfig)
print("FETCHING TRAIN AND TEST QUERY CRITEO_DENSE")
train_query, test_query = make_query(
dataset, datasetconfig, column_names)
print("DROPPING TRAIN AND TABLE IF THEY EXIST")
with connection.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS " +
datasetconfig["table"]+"_train")
connection.commit()
with connection.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS " +
datasetconfig["table"]+"_test")
connection.commit()
print("CREATING TABLES FOR CRITEO_DENSE")
with connection.cursor() as cursor:
cursor.execute(train_query)
cursor.execute(test_query)
connection.commit()
print("LOADING DATA FOR CRITEO_DENSE")
with connection.cursor() as cur:
train.head()
rows = len(train)
for i in range(rows):
cur.execute("INSERT INTO criteo_dense_train(label,row) VALUES(%s, %s)", (int(
train.loc[i, 'label']), list(train.loc[i, column_names])))
if i % 100 == 0:
print(f'Written Rows: {i}')
connection.commit()
print("LOADED "+datasetconfig["table"]+"_train"+" to DB")
df.head()
rows = len(df)
for i in range(rows):
cur.execute("INSERT INTO criteo_dense_test(label,row) VALUES(%s, %s)", (int(
df.loc[i, 'label']), list(df.loc[i, column_names])))
if i % 50 == 0:
print(i)
connection.commit()
print("LOADED "+datasetconfig["table"]+"_test"+" to DB")
exit()

# Split dataset
train_size = math.floor(len(df) * datasetconfig["train"])
train = df.head(train_size)
Expand All @@ -473,6 +541,7 @@ def create_tables(
connection = get_connection(pgsqlconfig)
print("FETCHING TRAIN AND TEST QUERY")
train_query, test_query = make_query(dataset, datasetconfig, column_names)
print(train_query, test_query)
print("CREATING TRAIN AND TEST TABLES")
create_tables(connection, train_query, test_query,
train_csv_path, test_csv_path, dataset)
25 changes: 23 additions & 2 deletions model-inference/decisionTree/experiments/model_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import numpy as np
import math
from scipy import sparse as sp
from sklearn.metrics import classification_report, mean_squared_error

dataset_folder = "dataset/"
Expand All @@ -12,6 +13,18 @@ def calculate_time(start_time, end_time):
diff = (end_time-start_time)*1000
return diff

def todense_fill(csr: sp.csr_matrix, fill_value: float) -> np.ndarray:
"""Densify a sparse CSR matrix. Same as csr_matrix.todense()
except it fills missing entries with fill_value instead of 0.
"""
dummy_value = np.nan if not np.isnan(fill_value) else np.inf
dummy_check = np.isnan if np.isnan(dummy_value) else np.isinf
csr = csr.copy().astype(float)
csr.data[csr.data == 0] = dummy_value
out = np.array(csr.todense()).squeeze()
out[out == 0] = fill_value
out[dummy_check(out)] = 0
return out

def load_data_from_pickle(dataset, config, suffix, time_consume):
start_time = time.time()
Expand Down Expand Up @@ -49,20 +62,28 @@ def fetch_data(dataset, config, suffix, time_consume=None):
pgsqlconfig = config["pgsqlconfig"]
datasetconfig = config[dataset]
query = datasetconfig["query"]+"_"+suffix
# print(query)
dbURL = "postgresql://"+pgsqlconfig["username"]+":"+pgsqlconfig["password"] + \
"@"+pgsqlconfig["host"]+":" + \
pgsqlconfig["port"]+"/"+pgsqlconfig["dbname"]
# print(dbURL)
# print(query)
print(dbURL)
print(query)
start_time = time.time()
dataframe = cx.read_sql(dbURL, query)
print(dataframe.head())
if dataset == 'epsilon':
unpacked = zip(*list(dataframe['row'].values))
for i in range(1, 2001):
dataframe[i] = next(unpacked)

dataframe.drop('row', axis=1, inplace=True)
# dataframe['row'] = dataframe['row'].apply(lambda row:np.array(row))
elif dataset == 'criteo_dense':
unpacked = zip(*list(dataframe['row'].values))
for i in range(datasetconfig['num_features']):
dataframe[f'feature_{i}'] = next(unpacked)
dataframe.drop('row', axis=1, inplace=True)
# dataframe['row'] = dataframe['row'].apply(lambda row:np.array(row))
end_time = time.time()
data_loading_time = calculate_time(start_time, end_time)
if time_consume is not None:
Expand Down
3 changes: 2 additions & 1 deletion model-inference/decisionTree/experiments/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def parse_arguments(config):
'bosch',
'covtype',
'criteo',
'tpcxai_fraud'],
'tpcxai_fraud',
'criteo_dense'],
help="Dataset to be tested.")
parser.add_argument(
"-m", "--model", type=str,
Expand Down