diff --git a/model-inference/decisionTree/experiments/config.json b/model-inference/decisionTree/experiments/config.json index 76198d19..75a0b244 100644 --- a/model-inference/decisionTree/experiments/config.json +++ b/model-inference/decisionTree/experiments/config.json @@ -1,5 +1,5 @@ { - "num_trees": 1600, + "num_trees": 500, "depth": 8, "pgsqlconfig": { "host": "localhost", @@ -174,5 +174,21 @@ "filename": "", "table": "", "header": false + }, + "criteo_dense": { + "num_features": 1000000, + "rows": 86, + "batch_size": 100000, + "query_size": 100000, + "type": "classification", + "info": "https://www.kaggle.com/c/criteo-display-ad-challenge/", + "query": "SELECT * from criteo_dense", + "create": "", + "train": 0, + "test": 1, + "y_col": "label", + "filename": "criteo_dense", + "table": "criteo_dense", + "header": true } } diff --git a/model-inference/decisionTree/experiments/data_processing.py b/model-inference/decisionTree/experiments/data_processing.py index 32d39ae3..4d9907ce 100644 --- a/model-inference/decisionTree/experiments/data_processing.py +++ b/model-inference/decisionTree/experiments/data_processing.py @@ -2,7 +2,7 @@ import gc import json import pickle -from model_helper import relative2abspath, dataset_folder +from model_helper import relative2abspath, dataset_folder, todense_fill import numpy as np import pandas as pd from urllib.request import urlretrieve @@ -29,8 +29,9 @@ def parse_arguments(): 'bosch', 'covtype', 'criteo', - 'tpcxai_fraud'], - help="Dataset to be processed. Choose from ['higgs', 'airline_regression', 'airline_classification', 'fraud', 'year', 'epsilon', 'bosch', 'covtype','tpcxai_fraud','criteo']") + 'tpcxai_fraud', + 'criteo_dense'], + help="Dataset to be processed. Choose from ['higgs', 'airline_regression', 'airline_classification', 'fraud', 'year', 'epsilon', 'bosch', 'covtype','tpcxai_fraud','criteo', 'criteo_dense']") parser.add_argument("-n", "--nrows", type=int, help="Load nrows of the dataset. Warning: only use in development.") parser.add_argument("-sf","--scalefactor", type=int, help="Relevant only for TPCxAI_Fraud. Takes one of the values in 1, 3, 10 and 30") @@ -244,6 +245,24 @@ def prepare_criteo(dataset_folder): if not (os.path.isfile(train_path) and os.path.isfile(test_path)): os.system(f"tar -Jxf {local_url} -C {dataset_folder}") +def prepare_criteo_dense(dataset_folder): + prepare_criteo(dataset_folder) + read_lines = 1e4 # 5e4 # 5e6 # 6042135 + num_features = 1000000 + test_path = relative2abspath(dataset_folder, "criteo.kaggle2014.svm", "test.txt.svm") + if os.path.isfile(test_path): + test_df_features, test_df_labels = datasets.load_svmlight_file(test_path, n_features=num_features, length=read_lines) + # print(test_df_features.todense(), test_df_labels[..., np.newaxis]) + # print(np.append(test_df_features.todense(), test_df_labels[..., np.newaxis], axis=1).shape) + test_df = pd.DataFrame(np.append(todense_fill(test_df_features, fill_value=-1), test_df_labels[..., np.newaxis], axis=1), columns=[f'feature_{idx}' for idx in range(num_features)]+['label']) + print('Test Dataset Shape:',test_df.shape) + # print(test_df.shape) + # test_df = test_df.join(pd.DataFrame(test_df_labels, columns=['label'])) + # print(test_df[['feature_1']].describe()) + return test_df + return None + + def get_connection(pgsqlconfig): return psycopg2.connect( database=pgsqlconfig["dbname"], @@ -276,6 +295,10 @@ def make_query(dataset, datasetconfig, column_names): feature_names = ", ".join([f"{col_name} DECIMAL NOT NULL" for col_name in column_names]) label_name = f"{datasetconfig['y_col']} INTEGER NOT NULL" create_query = f"CREATE TABLE ** ({feature_names}, {label_name})" + elif dataset == "criteo_dense": + feature_names = '''"row" double precision[]''' + label_name = f"{datasetconfig['y_col']} INTEGER NOT NULL" + create_query = f"CREATE TABLE ** ({label_name}, {feature_names})" else: create_query = datasetconfig["create"] train_create_query = create_query.replace( @@ -387,11 +410,11 @@ def create_tables( print('-'*50) df = pd.concat([df,prepare_tpcxai_fraud_transactions(dataset_folder, nrows=partition_size, skip_rows=range(1,partition_size*i))]) print(f'Final Shape of DataFrame: {df.shape}') - elif dataset == 'criteo': prepare_criteo(dataset_folder) exit() - + elif dataset == 'criteo_dense': + df = prepare_criteo_dense(dataset_folder) else: raise ValueError(f"{dataset} not supported") @@ -450,6 +473,51 @@ def create_tables( exit() + # CRITEO_DENSE FOLLOWS DIFFERENT LOADING INSTRUCTIONS AS + # IT HAS MORE THAN 1600 COLUMNS + if dataset == "criteo_dense": + train = pd.DataFrame({'label': []}) + column_names = list(df.columns) + connection = get_connection(pgsqlconfig) + print("FETCHING TRAIN AND TEST QUERY CRITEO_DENSE") + train_query, test_query = make_query( + dataset, datasetconfig, column_names) + print("DROPPING TRAIN AND TABLE IF THEY EXIST") + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS " + + datasetconfig["table"]+"_train") + connection.commit() + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS " + + datasetconfig["table"]+"_test") + connection.commit() + print("CREATING TABLES FOR CRITEO_DENSE") + with connection.cursor() as cursor: + cursor.execute(train_query) + cursor.execute(test_query) + connection.commit() + print("LOADING DATA FOR CRITEO_DENSE") + with connection.cursor() as cur: + train.head() + rows = len(train) + for i in range(rows): + cur.execute("INSERT INTO criteo_dense_train(label,row) VALUES(%s, %s)", (int( + train.loc[i, 'label']), list(train.loc[i, column_names]))) + if i % 100 == 0: + print(f'Written Rows: {i}') + connection.commit() + print("LOADED "+datasetconfig["table"]+"_train"+" to DB") + df.head() + rows = len(df) + for i in range(rows): + cur.execute("INSERT INTO criteo_dense_test(label,row) VALUES(%s, %s)", (int( + df.loc[i, 'label']), list(df.loc[i, column_names]))) + if i % 50 == 0: + print(i) + connection.commit() + print("LOADED "+datasetconfig["table"]+"_test"+" to DB") + exit() + # Split dataset train_size = math.floor(len(df) * datasetconfig["train"]) train = df.head(train_size) @@ -473,6 +541,7 @@ def create_tables( connection = get_connection(pgsqlconfig) print("FETCHING TRAIN AND TEST QUERY") train_query, test_query = make_query(dataset, datasetconfig, column_names) + print(train_query, test_query) print("CREATING TRAIN AND TEST TABLES") create_tables(connection, train_query, test_query, train_csv_path, test_csv_path, dataset) diff --git a/model-inference/decisionTree/experiments/model_helper.py b/model-inference/decisionTree/experiments/model_helper.py index e32dbc06..79a2a48c 100644 --- a/model-inference/decisionTree/experiments/model_helper.py +++ b/model-inference/decisionTree/experiments/model_helper.py @@ -3,6 +3,7 @@ import os import numpy as np import math +from scipy import sparse as sp from sklearn.metrics import classification_report, mean_squared_error dataset_folder = "dataset/" @@ -12,6 +13,18 @@ def calculate_time(start_time, end_time): diff = (end_time-start_time)*1000 return diff +def todense_fill(csr: sp.csr_matrix, fill_value: float) -> np.ndarray: + """Densify a sparse CSR matrix. Same as csr_matrix.todense() + except it fills missing entries with fill_value instead of 0. + """ + dummy_value = np.nan if not np.isnan(fill_value) else np.inf + dummy_check = np.isnan if np.isnan(dummy_value) else np.isinf + csr = csr.copy().astype(float) + csr.data[csr.data == 0] = dummy_value + out = np.array(csr.todense()).squeeze() + out[out == 0] = fill_value + out[dummy_check(out)] = 0 + return out def load_data_from_pickle(dataset, config, suffix, time_consume): start_time = time.time() @@ -49,13 +62,15 @@ def fetch_data(dataset, config, suffix, time_consume=None): pgsqlconfig = config["pgsqlconfig"] datasetconfig = config[dataset] query = datasetconfig["query"]+"_"+suffix + # print(query) dbURL = "postgresql://"+pgsqlconfig["username"]+":"+pgsqlconfig["password"] + \ "@"+pgsqlconfig["host"]+":" + \ pgsqlconfig["port"]+"/"+pgsqlconfig["dbname"] - # print(dbURL) - # print(query) + print(dbURL) + print(query) start_time = time.time() dataframe = cx.read_sql(dbURL, query) + print(dataframe.head()) if dataset == 'epsilon': unpacked = zip(*list(dataframe['row'].values)) for i in range(1, 2001): @@ -63,6 +78,12 @@ def fetch_data(dataset, config, suffix, time_consume=None): dataframe.drop('row', axis=1, inplace=True) # dataframe['row'] = dataframe['row'].apply(lambda row:np.array(row)) + elif dataset == 'criteo_dense': + unpacked = zip(*list(dataframe['row'].values)) + for i in range(datasetconfig['num_features']): + dataframe[f'feature_{i}'] = next(unpacked) + dataframe.drop('row', axis=1, inplace=True) + # dataframe['row'] = dataframe['row'].apply(lambda row:np.array(row)) end_time = time.time() data_loading_time = calculate_time(start_time, end_time) if time_consume is not None: diff --git a/model-inference/decisionTree/experiments/test_model.py b/model-inference/decisionTree/experiments/test_model.py index 6d7aa777..3117bb06 100644 --- a/model-inference/decisionTree/experiments/test_model.py +++ b/model-inference/decisionTree/experiments/test_model.py @@ -38,7 +38,8 @@ def parse_arguments(config): 'bosch', 'covtype', 'criteo', - 'tpcxai_fraud'], + 'tpcxai_fraud', + 'criteo_dense'], help="Dataset to be tested.") parser.add_argument( "-m", "--model", type=str,