Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added hive support #1313

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions src/main/java/org/ohdsi/webapi/service/DDLService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import static org.ohdsi.webapi.service.SqlRenderService.translateSQL;

import java.util.*;
import java.util.stream.Collectors;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;

import com.odysseusinc.arachne.commons.types.DBMSType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.ohdsi.circe.helper.ResourceHelper;
import org.ohdsi.webapi.sqlrender.SourceStatement;
Expand All @@ -43,34 +45,36 @@ public class DDLService {
public static final String CEM_SCHEMA = "cem_results_schema";
public static final String TEMP_SCHEMA = "oracle_temp_schema";

private static final Collection<String> RESULT_DDL_FILE_PATHS = Arrays.asList(
"/ddl/results/cohort.sql",
"/ddl/results/cohort_features.sql",
"/ddl/results/cohort_features_analysis_ref.sql",
"/ddl/results/cohort_features_dist.sql",
"/ddl/results/cohort_features_ref.sql",
"/ddl/results/cohort_inclusion.sql",
"/ddl/results/cohort_inclusion_result.sql",
"/ddl/results/cohort_inclusion_stats.sql",
"/ddl/results/cohort_summary_stats.sql",
"/ddl/results/cohort_censor_stats.sql",
"/ddl/results/feas_study_inclusion_stats.sql",
"/ddl/results/feas_study_index_stats.sql",
"/ddl/results/feas_study_result.sql",
"/ddl/results/heracles_analysis.sql",
"/ddl/results/heracles_heel_results.sql",
"/ddl/results/heracles_results.sql",
"/ddl/results/heracles_results_dist.sql",
"/ddl/results/ir_analysis_dist.sql",
"/ddl/results/ir_analysis_result.sql",
"/ddl/results/ir_analysis_strata_stats.sql",
"/ddl/results/ir_strata.sql",
"/ddl/results/heracles_periods.sql",
"/ddl/results/cohort_characterizations.sql",
"/ddl/results/pathway_analysis_codes.sql",
"/ddl/results/pathway_analysis_events.sql",
"/ddl/results/pathway_analysis_paths.sql",
"/ddl/results/pathway_analysis_stats.sql"

private static final String RESULT_DDL_ROOT = "/ddl/results";
private static final Collection<String> RESULT_DDL_FILE_NAMES = Arrays.asList(
"cohort.sql",
"cohort_features.sql",
"cohort_features_analysis_ref.sql",
"cohort_features_dist.sql",
"cohort_features_ref.sql",
"cohort_inclusion.sql",
"cohort_inclusion_result.sql",
"cohort_inclusion_stats.sql",
"cohort_summary_stats.sql",
"cohort_censor_stats.sql",
"feas_study_inclusion_stats.sql",
"feas_study_index_stats.sql",
"feas_study_result.sql",
"heracles_analysis.sql",
"heracles_heel_results.sql",
"heracles_results.sql",
"heracles_results_dist.sql",
"ir_analysis_dist.sql",
"ir_analysis_result.sql",
"ir_analysis_strata_stats.sql",
"ir_strata.sql",
"heracles_periods.sql",
"cohort_characterizations.sql",
"pathway_analysis_codes.sql",
"pathway_analysis_events.sql",
"pathway_analysis_paths.sql",
"pathway_analysis_stats.sql"
);

private static final String INIT_HERACLES_PERIODS = "/ddl/results/init_heracles_periods.sql";
Expand Down Expand Up @@ -113,7 +117,7 @@ public String generateResultSQL(
@DefaultValue("true") @QueryParam("initConceptHierarchy") Boolean initConceptHierarchy,
@QueryParam("tempSchema") String tempSchema) {

Collection<String> resultDDLFilePaths = new ArrayList<>(RESULT_DDL_FILE_PATHS);
Collection<String> resultDDLFilePaths = getResultDDLFilePaths(dialect);

if (initConceptHierarchy) {
resultDDLFilePaths.addAll(INIT_CONCEPT_HIERARCHY_FILE_PATHS);
Expand Down Expand Up @@ -170,6 +174,22 @@ private String generateSQL(String dialect, Map<String, String> params, Collectio
return result.replaceAll(";", ";\n");
}

private List<String> getResultDDLFilePaths(@QueryParam("dialect") String dialect) {

return new ArrayList<>(RESULT_DDL_FILE_NAMES).stream()
.map(fileName -> {
String dialectSpecificFilePath = String.format("%s/%s/%s", RESULT_DDL_ROOT, StringUtils.lowerCase(dialect), fileName);
if (isResourceFileExists(dialectSpecificFilePath)) {
return dialectSpecificFilePath;
}
return String.format("%s/%s", RESULT_DDL_ROOT, fileName);
})
.collect(Collectors.toList());
}

private boolean isResourceFileExists(String fileName) {
return ResourceHelper.class.getResource(fileName) != null;
}
private String translateSqlFile(String sql, String dialect, Map<String, String> params) {

SourceStatement statement = new SourceStatement();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.ohdsi.webapi.util;

import org.apache.tika.concurrent.SimpleThreadPoolExecutor;
import org.ohdsi.webapi.user.importer.UserImportController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.StatementCallback;
Expand All @@ -15,6 +18,8 @@

public class BatchStatementExecutorWithProgress {

private static final Logger logger = LoggerFactory.getLogger(BatchStatementExecutorWithProgress.class);

private String[] statements;

private final TransactionTemplate transactionTemplate;
Expand All @@ -41,6 +46,7 @@ public int[] execute(Consumer<Integer> consumer){
try {
for (int i = 0; i < totals; i++) {
String stmt = statements[i];
logger.debug("Btch query: {}", stmt);
updateCount[i] = jdbcTemplate.execute((StatementCallback<Integer>) st -> !st.execute(stmt) ? st.getUpdateCount() : 0);
if (i % PROGRESS_UPDATE_SIZE == 0 || i == (totals - 1)) {
int progress = (int) Math.round(100.0 * i / totals);
Expand Down
14 changes: 14 additions & 0 deletions src/main/resources/ddl/results/hive/heracles_results.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
IF OBJECT_ID('@results_schema.heracles_results', 'U') IS NULL
create table @results_schema.heracles_results
(
analysis_id int,
stratum_1 varchar(255),
stratum_2 varchar(255),
stratum_3 varchar(255),
stratum_4 varchar(255),
stratum_5 varchar(255),
count_value bigint,
last_update_time timestamp
)
PARTITIONED BY(cohort_definition_id int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use HINT DISTRIBUTE_ON_KEY and HINT SORT_ON_KEY and get rid off the separate SQL file for Hive

clustered by (analysis_id) into 64 buckets;
23 changes: 23 additions & 0 deletions src/main/resources/ddl/results/hive/heracles_results_dist.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
IF OBJECT_ID('@results_schema.heracles_results_dist', 'U') IS NULL
create table @results_schema.heracles_results_dist
(
analysis_id int,
stratum_1 varchar(255),
stratum_2 varchar(255),
stratum_3 varchar(255),
stratum_4 varchar(255),
stratum_5 varchar(255),
count_value bigint,
min_value float,
max_value float,
avg_value float,
stdev_value float,
median_value float,
p10_value float,
p25_value float,
p75_value float,
p90_value float,
last_update_time timestamp
)
PARTITIONED BY(cohort_definition_id int)
clustered by (analysis_id) into 64 buckets;