Skip to content

Commit

Permalink
Added mechanism to recover resource tables on Thrift server in case i…
Browse files Browse the repository at this point in the history
…t goes down (google#655)

* Added mechanism to recover resource tables on Thrift server in case it goes down.

* Added config based condition to create tables.

* Made changes as per review comments.

* Made changes to have Apache Beam file implementation instead of java.nio

* Updated comment for application ready event listener method onApplicationEvent

* Added sorting logic for snapshots.

* Added UI control to create resource tables.

* Changed docker compose version to 3.3

* Changed docker compose version to 2.4

* Removed restart condition from compose

* Added error handler for UI control for tables creation.

* Made changes to PipelineManager as per recent code changes.
  • Loading branch information
atulai-sg authored May 11, 2023
1 parent 6b749a8 commit 6c6e6ec
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 23 deletions.
13 changes: 11 additions & 2 deletions docker/compose-controller-spark-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
# to /opt/bitnami/spark/conf/
# https://spark.apache.org/docs/latest/configuration.html

version: '2'
# Version 2.4 supports healthcheck feature.
version: '2.4'

services:
pipeline-controller:
Expand All @@ -69,6 +70,9 @@ services:
- ${DWH_ROOT}:/dwh
ports:
- '8090:8080'
depends_on:
thriftserver:
condition: service_healthy

spark:
image: docker.io/bitnami/spark:3.3
Expand Down Expand Up @@ -118,9 +122,14 @@ services:
- '4041:4040'
volumes:
- ${DWH_ROOT}:/dwh

volumes_from:
- spark
healthcheck:
test: beeline -u jdbc:hive2://localhost:10000 -n hive -e 'show tables;' || exit 1
interval: 30s
retries: 5
start_period: 10s
timeout: 10s

volumes:
spark:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ public ProgressStats getStatus() {
return progressStats;
}

@PostMapping("/tables")
public String createResourceTables() {
if (pipelineManager.isRunning()) {
throw new IllegalStateException("Cannot create tables because another pipeline is running!");
}
logger.info("Received request to create request tables ...");
pipelineManager.createResourceTables();
return "SUCCESS";
}

private Stats getStats() {
MetricQueryResults metricQueryResults = pipelineManager.getMetricQueryResults();
return Stats.createStats(metricQueryResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void createResourceTables(
// (https://github.com/google/fhir-data-pipes/issues/483)
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
for (String resource : resources) {
createResourceTable(connection, resource, timestamp, thriftServerParquetPath);
createResourceAndCanonicalTables(connection, resource, timestamp, thriftServerParquetPath);
}
}
}
Expand All @@ -73,29 +73,75 @@ public void createResourceTables(
* files, thriftServerParquetPath is the exact path for parquet files and resource shall be the
* respective resource name e.g. Patient
*/
private void createResourceTable(
private void createResourceAndCanonicalTables(
Connection connection, String resource, String timestamp, String thriftServerParquetPath)
throws SQLException {
try (Statement statement = connection.createStatement()) {
String sql =
String.format(
"CREATE TABLE IF NOT EXISTS default.%s_%s USING PARQUET LOCATION '%s/%s/%s'",
resource,
timestamp,
THRIFT_CONTAINER_PARQUET_PATH_PREFIX,
thriftServerParquetPath,
resource);
statement.execute(sql);

// Drop canonical table if exists.
sql = String.format("DROP TABLE IF EXISTS default.%s", resource);
statement.execute(sql);
String sql =
String.format(
"CREATE TABLE IF NOT EXISTS default.%s_%s USING PARQUET LOCATION '%s/%s/%s'",
resource,
timestamp,
THRIFT_CONTAINER_PARQUET_PATH_PREFIX,
thriftServerParquetPath,
resource);
executeSql(connection, sql);

// Drop canonical table if exists.
sql = String.format("DROP TABLE IF EXISTS default.%s", resource);
executeSql(connection, sql);

// Create canonical table with latest parquet files.
sql =
String.format(
"CREATE TABLE IF NOT EXISTS default.%s USING PARQUET LOCATION '%s/%s/%s'",
resource, THRIFT_CONTAINER_PARQUET_PATH_PREFIX, thriftServerParquetPath, resource);
executeSql(connection, sql);
}

/**
* The method creates resource tables with names suffixed with given timestamp if not present. It
* excepts complete parquet files path.
*
* @param resource FHIR resource type.
* @param timestamp timestamp string to be used for resource table name.
* @param thriftServerParquetPath directory path having output parquet files.
* @throws SQLException
*/
public void createResourceTable(String resource, String timestamp, String thriftServerParquetPath)
throws SQLException {

// Create canonical table with latest parquet files.
sql =
String.format(
"CREATE TABLE IF NOT EXISTS default.%s USING PARQUET LOCATION '%s/%s/%s'",
resource, THRIFT_CONTAINER_PARQUET_PATH_PREFIX, thriftServerParquetPath, resource);
String sql =
String.format(
"CREATE TABLE IF NOT EXISTS default.%s_%s USING PARQUET LOCATION '%s'",
resource, timestamp, thriftServerParquetPath);
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
executeSql(connection, sql);
}
}

/**
* This method creates canonical resource tables if not present. It excepts complete parquet files
* path.
*
* @param resource FHIR resource type.
* @param thriftServerParquetPath directory path having output parquet files.
* @throws SQLException
*/
public void createResourceCanonicalTable(String resource, String thriftServerParquetPath)
throws SQLException {

String sql =
String.format(
"CREATE TABLE IF NOT EXISTS default.%s USING PARQUET LOCATION '%s'",
resource, thriftServerParquetPath);
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
executeSql(connection, sql);
}
}

private void executeSql(Connection connection, String sql) throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
Expand All @@ -39,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
Expand All @@ -52,7 +57,7 @@
*/
@EnableScheduling
@Component
public class PipelineManager {
public class PipelineManager implements ApplicationListener<ApplicationReadyEvent> {

private static final Logger logger = LoggerFactory.getLogger(PipelineManager.class.getName());

Expand Down Expand Up @@ -260,6 +265,84 @@ synchronized void runIncrementalPipeline()
}
}

@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
createResourceTables();
}

/**
* This method checks upon Controller start checks on Thrift sever to create resource tables if
* they don't exist. There is a @PostConstruct method present in this class which is initDwhStatus
* and the reason below code has not been added because dataProperties.getThriftserverHiveConfig()
* turns out to be null when used by
* DatabaseConfiguration.createConfigFromFile(dataProperties.getThriftserverHiveConfig()).
*/
public void createResourceTables() {
if (!dataProperties.isCreateHiveResourceTables()) {
return;
}

DatabaseConfiguration dbConfig;
try {
dbConfig =
DatabaseConfiguration.createConfigFromFile(dataProperties.getThriftserverHiveConfig());
} catch (IOException e) {
logger.error("Exception while reading thrift hive config.");
throw new RuntimeException(e);
}
HiveTableManager hiveTableManager =
new HiveTableManager(
dbConfig.makeJdbsUrlFromConfig(),
dbConfig.getDatabaseUser(),
dbConfig.getDatabasePassword());

String rootPrefix = dataProperties.getDwhRootPrefix();
Preconditions.checkState(rootPrefix != null && !rootPrefix.isEmpty());

String prefix = dwhFilesManager.getPrefix(rootPrefix);
String baseDir = dwhFilesManager.getBaseDir(rootPrefix);

try {
List<ResourceId> paths =
dwhFilesManager.getAllChildDirectories(baseDir).stream()
.filter(dir -> dir.getFilename().startsWith(prefix + DataProperties.TIMESTAMP_PREFIX))
.collect(Collectors.toList());

Preconditions.checkState(paths != null, "Make sure DWH prefix is a valid path!");

// Sort snapshots directories.
Collections.sort(paths, Comparator.comparing(ResourceId::toString));

int snapshotCount = 0;
for (ResourceId path : paths) {
snapshotCount++;
Set<ResourceId> childPaths =
dwhFilesManager.getAllChildDirectories(baseDir + "/" + path.getFilename());
for (ResourceId resourceId : childPaths) {
String resource = resourceId.getFilename();
String[] tokens = path.getFilename().split(prefix + DataProperties.TIMESTAMP_PREFIX);
if (tokens.length > 1) {
String timestamp = tokens[1];
String thriftServerParquetPath =
baseDir + "/" + path.getFilename() + "/" + resourceId.getFilename();
logger.debug("thriftServerParquetPath: ", thriftServerParquetPath);
hiveTableManager.createResourceTable(resource, timestamp, thriftServerParquetPath);
// Create Canonical table for the latest snapshot.
if (snapshotCount == paths.size()) {
hiveTableManager.createResourceCanonicalTable(resource, thriftServerParquetPath);
}
}
}
}
} catch (IOException e) {
logger.error("Exception while reading thriftserver parquet output directory: ", e);
throw new RuntimeException(e);
} catch (SQLException e) {
logger.error("Exception while creating resource tables on thriftserver: ", e);
throw new RuntimeException(e);
}
}

private synchronized void updateDwh(String newRoot) {
currentDwh = DwhFiles.forRoot(newRoot);
}
Expand Down
36 changes: 36 additions & 0 deletions pipelines/controller/src/main/resources/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@
function() {
updateStatus();
}, 2000);

function createResourceTables() {
var xhr = new XMLHttpRequest();
xhr.open("POST", '/tables', true);
xhr.send();
xhr.onreadystatechange = function() {
if (this.readyState === XMLHttpRequest.DONE && this.status === 200) {
console.log('Tables creation request submitted successfully.');
success.style.display = "block";
}
// Handle errors.
if (this.readyState === XMLHttpRequest.DONE && this.status != 200) {
console.log('Creation of tables failed: ' + this.response);
failure.style.display = "block";
}
}

}

setInterval(() => {
success.style.display = "none";
failure.style.display = "none";
}, 4000);

</script>
</head>

Expand Down Expand Up @@ -205,6 +229,12 @@ <h5>Run Full Pipeline</h5>
<h3>List of DWH snapshots</h3>
<div class="alert alert-secondary alert-dismissible fade show">
<b>Latest:</b> [[${dwh}]]
<div style="float:right;">
<button type="submit" name="mode" class="button btn btn-primary" style="margin: -7;"
th:classappend="${hasDwh ? '' : 'disabled'}" onclick="createResourceTables()">
Create Resource Tables
</button>
</div>
</div>
<!-- TODO add the list of DWH snapshots once the API is implemented
<div class="accordion" id="accordionDWH">
Expand Down Expand Up @@ -232,6 +262,12 @@ <h2 class="accordion-header" id="flush-headingOne">
</div>
</div-->
</div>
<div class="alert alert-success" style="float:right;display:none;" id="success">
Request for resource tables creation sent successfully.
</div>
<div class="alert alert-danger" style="float:right;display:none;" id="failure">
Something went wrong while creating resource tables.
</div>
<div style="margin-top: 50px">
<h3>Configuration Settings</h3>
<div class="accordion" id="configAccordian">
Expand Down

0 comments on commit 6c6e6ec

Please sign in to comment.