diff --git a/docker/compose-controller-spark-sql.yaml b/docker/compose-controller-spark-sql.yaml index a23959b05..606a3d38f 100644 --- a/docker/compose-controller-spark-sql.yaml +++ b/docker/compose-controller-spark-sql.yaml @@ -56,7 +56,8 @@ # to /opt/bitnami/spark/conf/ # https://spark.apache.org/docs/latest/configuration.html -version: '2' +# Version 2.4 supports healthcheck feature. +version: '2.4' services: pipeline-controller: @@ -69,6 +70,9 @@ services: - ${DWH_ROOT}:/dwh ports: - '8090:8080' + depends_on: + thriftserver: + condition: service_healthy spark: image: docker.io/bitnami/spark:3.3 @@ -118,9 +122,14 @@ services: - '4041:4040' volumes: - ${DWH_ROOT}:/dwh - volumes_from: - spark + healthcheck: + test: beeline -u jdbc:hive2://localhost:10000 -n hive -e 'show tables;' || exit 1 + interval: 30s + retries: 5 + start_period: 10s + timeout: 10s volumes: spark: diff --git a/pipelines/controller/src/main/java/org/openmrs/analytics/ApiController.java b/pipelines/controller/src/main/java/org/openmrs/analytics/ApiController.java index 2b9360709..8afe69197 100644 --- a/pipelines/controller/src/main/java/org/openmrs/analytics/ApiController.java +++ b/pipelines/controller/src/main/java/org/openmrs/analytics/ApiController.java @@ -64,6 +64,16 @@ public ProgressStats getStatus() { return progressStats; } + @PostMapping("/tables") + public String createResourceTables() { + if (pipelineManager.isRunning()) { + throw new IllegalStateException("Cannot create tables because another pipeline is running!"); + } + logger.info("Received request to create request tables ..."); + pipelineManager.createResourceTables(); + return "SUCCESS"; + } + private Stats getStats() { MetricQueryResults metricQueryResults = pipelineManager.getMetricQueryResults(); return Stats.createStats(metricQueryResults); diff --git a/pipelines/controller/src/main/java/org/openmrs/analytics/HiveTableManager.java b/pipelines/controller/src/main/java/org/openmrs/analytics/HiveTableManager.java index 20a364bb3..88943e269 100644 --- a/pipelines/controller/src/main/java/org/openmrs/analytics/HiveTableManager.java +++ b/pipelines/controller/src/main/java/org/openmrs/analytics/HiveTableManager.java @@ -60,7 +60,7 @@ public void createResourceTables( // (https://github.com/google/fhir-data-pipes/issues/483) try (Connection connection = DriverManager.getConnection(jdbcUrl)) { for (String resource : resources) { - createResourceTable(connection, resource, timestamp, thriftServerParquetPath); + createResourceAndCanonicalTables(connection, resource, timestamp, thriftServerParquetPath); } } } @@ -73,29 +73,75 @@ public void createResourceTables( * files, thriftServerParquetPath is the exact path for parquet files and resource shall be the * respective resource name e.g. Patient */ - private void createResourceTable( + private void createResourceAndCanonicalTables( Connection connection, String resource, String timestamp, String thriftServerParquetPath) throws SQLException { - try (Statement statement = connection.createStatement()) { - String sql = - String.format( - "CREATE TABLE IF NOT EXISTS default.%s_%s USING PARQUET LOCATION '%s/%s/%s'", - resource, - timestamp, - THRIFT_CONTAINER_PARQUET_PATH_PREFIX, - thriftServerParquetPath, - resource); - statement.execute(sql); - // Drop canonical table if exists. - sql = String.format("DROP TABLE IF EXISTS default.%s", resource); - statement.execute(sql); + String sql = + String.format( + "CREATE TABLE IF NOT EXISTS default.%s_%s USING PARQUET LOCATION '%s/%s/%s'", + resource, + timestamp, + THRIFT_CONTAINER_PARQUET_PATH_PREFIX, + thriftServerParquetPath, + resource); + executeSql(connection, sql); + + // Drop canonical table if exists. + sql = String.format("DROP TABLE IF EXISTS default.%s", resource); + executeSql(connection, sql); + + // Create canonical table with latest parquet files. + sql = + String.format( + "CREATE TABLE IF NOT EXISTS default.%s USING PARQUET LOCATION '%s/%s/%s'", + resource, THRIFT_CONTAINER_PARQUET_PATH_PREFIX, thriftServerParquetPath, resource); + executeSql(connection, sql); + } + + /** + * The method creates resource tables with names suffixed with given timestamp if not present. It + * excepts complete parquet files path. + * + * @param resource FHIR resource type. + * @param timestamp timestamp string to be used for resource table name. + * @param thriftServerParquetPath directory path having output parquet files. + * @throws SQLException + */ + public void createResourceTable(String resource, String timestamp, String thriftServerParquetPath) + throws SQLException { - // Create canonical table with latest parquet files. - sql = - String.format( - "CREATE TABLE IF NOT EXISTS default.%s USING PARQUET LOCATION '%s/%s/%s'", - resource, THRIFT_CONTAINER_PARQUET_PATH_PREFIX, thriftServerParquetPath, resource); + String sql = + String.format( + "CREATE TABLE IF NOT EXISTS default.%s_%s USING PARQUET LOCATION '%s'", + resource, timestamp, thriftServerParquetPath); + try (Connection connection = DriverManager.getConnection(jdbcUrl)) { + executeSql(connection, sql); + } + } + + /** + * This method creates canonical resource tables if not present. It excepts complete parquet files + * path. + * + * @param resource FHIR resource type. + * @param thriftServerParquetPath directory path having output parquet files. + * @throws SQLException + */ + public void createResourceCanonicalTable(String resource, String thriftServerParquetPath) + throws SQLException { + + String sql = + String.format( + "CREATE TABLE IF NOT EXISTS default.%s USING PARQUET LOCATION '%s'", + resource, thriftServerParquetPath); + try (Connection connection = DriverManager.getConnection(jdbcUrl)) { + executeSql(connection, sql); + } + } + + private void executeSql(Connection connection, String sql) throws SQLException { + try (Statement statement = connection.createStatement()) { statement.execute(sql); } } diff --git a/pipelines/controller/src/main/java/org/openmrs/analytics/PipelineManager.java b/pipelines/controller/src/main/java/org/openmrs/analytics/PipelineManager.java index 091fb4d8b..ea97420d6 100644 --- a/pipelines/controller/src/main/java/org/openmrs/analytics/PipelineManager.java +++ b/pipelines/controller/src/main/java/org/openmrs/analytics/PipelineManager.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.sql.SQLException; import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import javax.annotation.PostConstruct; import org.apache.beam.runners.flink.FlinkPipelineOptions; @@ -39,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.support.CronExpression; @@ -52,7 +57,7 @@ */ @EnableScheduling @Component -public class PipelineManager { +public class PipelineManager implements ApplicationListener { private static final Logger logger = LoggerFactory.getLogger(PipelineManager.class.getName()); @@ -260,6 +265,84 @@ synchronized void runIncrementalPipeline() } } + @Override + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { + createResourceTables(); + } + + /** + * This method checks upon Controller start checks on Thrift sever to create resource tables if + * they don't exist. There is a @PostConstruct method present in this class which is initDwhStatus + * and the reason below code has not been added because dataProperties.getThriftserverHiveConfig() + * turns out to be null when used by + * DatabaseConfiguration.createConfigFromFile(dataProperties.getThriftserverHiveConfig()). + */ + public void createResourceTables() { + if (!dataProperties.isCreateHiveResourceTables()) { + return; + } + + DatabaseConfiguration dbConfig; + try { + dbConfig = + DatabaseConfiguration.createConfigFromFile(dataProperties.getThriftserverHiveConfig()); + } catch (IOException e) { + logger.error("Exception while reading thrift hive config."); + throw new RuntimeException(e); + } + HiveTableManager hiveTableManager = + new HiveTableManager( + dbConfig.makeJdbsUrlFromConfig(), + dbConfig.getDatabaseUser(), + dbConfig.getDatabasePassword()); + + String rootPrefix = dataProperties.getDwhRootPrefix(); + Preconditions.checkState(rootPrefix != null && !rootPrefix.isEmpty()); + + String prefix = dwhFilesManager.getPrefix(rootPrefix); + String baseDir = dwhFilesManager.getBaseDir(rootPrefix); + + try { + List paths = + dwhFilesManager.getAllChildDirectories(baseDir).stream() + .filter(dir -> dir.getFilename().startsWith(prefix + DataProperties.TIMESTAMP_PREFIX)) + .collect(Collectors.toList()); + + Preconditions.checkState(paths != null, "Make sure DWH prefix is a valid path!"); + + // Sort snapshots directories. + Collections.sort(paths, Comparator.comparing(ResourceId::toString)); + + int snapshotCount = 0; + for (ResourceId path : paths) { + snapshotCount++; + Set childPaths = + dwhFilesManager.getAllChildDirectories(baseDir + "/" + path.getFilename()); + for (ResourceId resourceId : childPaths) { + String resource = resourceId.getFilename(); + String[] tokens = path.getFilename().split(prefix + DataProperties.TIMESTAMP_PREFIX); + if (tokens.length > 1) { + String timestamp = tokens[1]; + String thriftServerParquetPath = + baseDir + "/" + path.getFilename() + "/" + resourceId.getFilename(); + logger.debug("thriftServerParquetPath: ", thriftServerParquetPath); + hiveTableManager.createResourceTable(resource, timestamp, thriftServerParquetPath); + // Create Canonical table for the latest snapshot. + if (snapshotCount == paths.size()) { + hiveTableManager.createResourceCanonicalTable(resource, thriftServerParquetPath); + } + } + } + } + } catch (IOException e) { + logger.error("Exception while reading thriftserver parquet output directory: ", e); + throw new RuntimeException(e); + } catch (SQLException e) { + logger.error("Exception while creating resource tables on thriftserver: ", e); + throw new RuntimeException(e); + } + } + private synchronized void updateDwh(String newRoot) { currentDwh = DwhFiles.forRoot(newRoot); } diff --git a/pipelines/controller/src/main/resources/templates/index.html b/pipelines/controller/src/main/resources/templates/index.html index b8863a4a7..29da0ca84 100644 --- a/pipelines/controller/src/main/resources/templates/index.html +++ b/pipelines/controller/src/main/resources/templates/index.html @@ -111,6 +111,30 @@ function() { updateStatus(); }, 2000); + + function createResourceTables() { + var xhr = new XMLHttpRequest(); + xhr.open("POST", '/tables', true); + xhr.send(); + xhr.onreadystatechange = function() { + if (this.readyState === XMLHttpRequest.DONE && this.status === 200) { + console.log('Tables creation request submitted successfully.'); + success.style.display = "block"; + } + // Handle errors. + if (this.readyState === XMLHttpRequest.DONE && this.status != 200) { + console.log('Creation of tables failed: ' + this.response); + failure.style.display = "block"; + } + } + + } + + setInterval(() => { + success.style.display = "none"; + failure.style.display = "none"; + }, 4000); + @@ -205,6 +229,12 @@
Run Full Pipeline

List of DWH snapshots

Latest: [[${dwh}]] +
+ +
+ +

Configuration Settings