Skip to content

Commit

Permalink
Fixed JDBC connection leak issue (google#679)
Browse files Browse the repository at this point in the history
  • Loading branch information
bashir2 authored May 12, 2023
1 parent 6c6e6ec commit 0141fd2
Show file tree
Hide file tree
Showing 24 changed files with 298 additions and 363 deletions.
1 change: 1 addition & 0 deletions docker/config/hapi-postgres-config_local.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"jdbcDriverClass": "org.postgresql.Driver",
"databaseService" : "postgresql",
"databaseHostName" : "hapi-fhir-db",
"databasePort" : "5432",
Expand Down
3 changes: 1 addition & 2 deletions pipelines/batch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ ENV BATCH_SIZE=10
ENV TARGET_PARALLELISM=10
ENV PARQUET_PATH="/tmp/"
ENV JDBC_MODE_ENABLED=false
ENV JDBC_DRIVER_CLASS="com.mysql.cj.jdbc.Driver"
ENV JDBC_FETCH_SIZE=10000
ENV JDBC_MAX_POOL_SIZE=50
ENV JDBC_INITIAL_POOL_SIZE=10
Expand All @@ -53,7 +52,7 @@ ENTRYPOINT java -jar /usr/src/Main/app.jar \
--fhirServerUrl=${FHIR_SERVER_URL} --fhirServerUserName=${FHIR_SERVER_USERNAME} --fhirServerPassword=${FHIR_SERVER_PASSWORD} \
--resourceList=${RESOURCE_LIST} --batchSize=${BATCH_SIZE} --targetParallelism=${TARGET_PARALLELISM} \
--fhirSinkPath=${SINK_PATH} --sinkUserName=${SINK_USERNAME} --sinkPassword=${SINK_PASSWORD} \
--outputParquetPath=${PARQUET_PATH} --jdbcModeEnabled=${JDBC_MODE_ENABLED} --jdbcDriverClass=${JDBC_DRIVER_CLASS} \
--outputParquetPath=${PARQUET_PATH} --jdbcModeEnabled=${JDBC_MODE_ENABLED} \
--jdbcMaxPoolSize=${JDBC_MAX_POOL_SIZE} \
--fhirDatabaseConfigPath=${FHIR_DATABASE_CONFIG_PATH} --jdbcInitialPoolSize=${JDBC_INITIAL_POOL_SIZE} \
--jdbcFetchSize=${JDBC_FETCH_SIZE} --jdbcModeHapi=${JDBC_MODE_HAPI} --runner=${RUNNER}
14 changes: 0 additions & 14 deletions pipelines/batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
<hamcrest.version>2.2</hamcrest.version>
<jackson.version>2.14.2</jackson.version>
<pubsub.version>v1-rev20220904-2.0.0</pubsub.version>
<auto-value.version>1.10.1</auto-value.version>
<spark.version>2.4.8</spark.version>
<maven-surefire-plugin.version>3.0.0</maven-surefire-plugin.version>
<nemo.version>0.1</nemo.version>
Expand Down Expand Up @@ -107,19 +106,6 @@
<version>${beam.version}</version>
</dependency>

<!-- Using AutoValue for POJOs used in PCollections. -->
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>${auto-value.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>${auto-value.version}</version>
<optional>true</optional>
</dependency>

<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void writeResource(HapiRowDescriptor element)
fhirStoreUtil.uploadResource(resource);
totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
}
if (!this.sinkDbUrl.isEmpty()) {
if (this.sinkDbConfig != null) {
// TODO : Remove the deleted resources from the sink database
// https://github.com/google/fhir-data-pipes/issues/588
jdbcWriter.writeResource(resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Resource;
import org.openmrs.analytics.JdbcConnectionPools.DataSourceConfig;
import org.openmrs.analytics.model.DatabaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,18 +79,12 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

private final int rowGroupSize;

protected final String sinkDbUrl;

private final String sinkDbUsername;

private final String sinkDbPassword;
protected final DataSourceConfig sinkDbConfig;

private final String sinkDbTableName;

private final boolean useSingleSinkDbTable;

private final String jdbcDriverClass;

private final int initialPoolSize;

private final int maxPoolSize;
Expand All @@ -107,26 +103,6 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

protected FhirContext fhirContext;

private static JdbcConnectionUtil jdbcConnectionUtil = null;

// This is to enforce the Singleton pattern for JdbcConnectionUtil used by all workers running
// this DoFn on the same VM; hence sharing connections.
private static synchronized JdbcConnectionUtil getJdbcConnectionUtil(
String jdbcDriverClass,
String jdbcUrl,
String dbUser,
String dbPassword,
int initialPoolSize,
int jdbcMaxPoolSize)
throws PropertyVetoException {
if (jdbcConnectionUtil == null) {
jdbcConnectionUtil =
new JdbcConnectionUtil(
jdbcDriverClass, jdbcUrl, dbUser, dbPassword, initialPoolSize, jdbcMaxPoolSize);
}
return jdbcConnectionUtil;
}

FetchSearchPageFn(FhirEtlOptions options, String stageIdentifier) {
this.sinkPath = options.getFhirSinkPath();
this.sinkUsername = options.getSinkUserName();
Expand All @@ -138,15 +114,23 @@ private static synchronized JdbcConnectionUtil getJdbcConnectionUtil(
this.parquetFile = options.getOutputParquetPath();
this.secondsToFlush = options.getSecondsToFlushParquetFiles();
this.rowGroupSize = options.getRowGroupSizeForParquetFiles();
this.sinkDbUrl = options.getSinkDbUrl();
this.sinkDbTableName = options.getSinkDbTablePrefix();
this.sinkDbUsername = options.getSinkDbUsername();
this.sinkDbPassword = options.getSinkDbPassword();
if (options.getSinkDbConfigPath().isEmpty()) {
this.sinkDbConfig = null;
} else {
try {
this.sinkDbConfig =
JdbcConnectionPools.dbConfigToDataSourceConfig(
DatabaseConfiguration.createConfigFromFile(options.getSinkDbConfigPath()));
} catch (IOException e) {
String error = "Cannot access file " + options.getSinkDbConfigPath();
log.error(error);
throw new IllegalArgumentException(error);
}
}
this.useSingleSinkDbTable = options.getUseSingleSinkTable();
this.initialPoolSize = options.getJdbcInitialPoolSize();
this.maxPoolSize = options.getJdbcMaxPoolSize();
// We are assuming that the potential source and sink DBs are the same type.
this.jdbcDriverClass = options.getJdbcDriverClass();
this.numFetchedResources =
Metrics.counter(
MetricsConstants.METRICS_NAMESPACE,
Expand Down Expand Up @@ -196,18 +180,12 @@ public void setup() throws SQLException, PropertyVetoException {
secondsToFlush,
rowGroupSize,
stageIdentifier + "_");
if (!sinkDbUrl.isEmpty()) {
DataSource dataSource =
getJdbcConnectionUtil(
jdbcDriverClass,
sinkDbUrl,
sinkDbUsername,
sinkDbPassword,
initialPoolSize,
maxPoolSize)
.getDataSource();
if (sinkDbConfig != null) {
DataSource jdbcSink =
JdbcConnectionPools.getInstance()
.getPooledDataSource(sinkDbConfig, initialPoolSize, maxPoolSize);
jdbcWriter =
new JdbcResourceWriter(dataSource, sinkDbTableName, useSingleSinkDbTable, fhirContext);
new JdbcResourceWriter(jdbcSink, sinkDbTableName, useSingleSinkDbTable, fhirContext);
}
}

Expand Down Expand Up @@ -239,7 +217,7 @@ protected void processBundle(Bundle bundle, @Nullable Set<String> resourceTypes)
fhirStoreUtil.uploadBundle(bundle);
totalPushTimeMillis.inc(System.currentTimeMillis() - pushStartTime);
}
if (!this.sinkDbUrl.isEmpty()) {
if (sinkDbConfig != null) {
if (bundle.getEntry() == null) {
return;
}
Expand Down
33 changes: 16 additions & 17 deletions pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
Expand Down Expand Up @@ -157,15 +158,13 @@ static void runFhirFetch(FhirEtlOptions options, FhirContext fhirContext) throws
EtlUtils.runPipelineWithTimestamp(pipeline, options);
}

private static JdbcConnectionUtil createJdbcConnection(
private static DataSource createJdbcPooledDataSource(
FhirEtlOptions options, DatabaseConfiguration dbConfig) throws PropertyVetoException {
return new JdbcConnectionUtil(
options.getJdbcDriverClass(),
dbConfig.makeJdbsUrlFromConfig(),
dbConfig.getDatabaseUser(),
dbConfig.getDatabasePassword(),
options.getJdbcInitialPoolSize(),
options.getJdbcMaxPoolSize());
return JdbcConnectionPools.getInstance()
.getPooledDataSource(
JdbcConnectionPools.dbConfigToDataSourceConfig(dbConfig),
options.getJdbcInitialPoolSize(),
options.getJdbcMaxPoolSize());
}

static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext)
Expand All @@ -174,8 +173,8 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext)
Pipeline pipeline = Pipeline.create(options);
DatabaseConfiguration dbConfig =
DatabaseConfiguration.createConfigFromFile(options.getFhirDatabaseConfigPath());
JdbcConnectionUtil jdbcConnectionUtil = createJdbcConnection(options, dbConfig);
JdbcFetchOpenMrs jdbcUtil = new JdbcFetchOpenMrs(jdbcConnectionUtil);
DataSource jdbcSource = createJdbcPooledDataSource(options, dbConfig);
JdbcFetchOpenMrs jdbcUtil = new JdbcFetchOpenMrs(jdbcSource);
int batchSize =
Math.min(
options.getBatchSize(), 170); // batch size > 200 will result in HTTP 400 Bad Request
Expand Down Expand Up @@ -220,7 +219,7 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext)
}

private static void validateOptions(FhirEtlOptions options)
throws SQLException, PropertyVetoException {
throws SQLException, PropertyVetoException, IOException {
if (!options.getActivePeriod().isEmpty()) {
Set<String> resourceSet = Sets.newHashSet(options.getResourceList().split(","));
if (resourceSet.contains("Patient")) {
Expand Down Expand Up @@ -256,7 +255,7 @@ private static void validateOptions(FhirEtlOptions options)
}
}

if (!options.getSinkDbUrl().isEmpty()) {
if (!options.getSinkDbConfigPath().isEmpty()) {
JdbcResourceWriter.createTables(options);
}
}
Expand Down Expand Up @@ -286,9 +285,9 @@ static Pipeline buildHapiJdbcFetch(
throws PropertyVetoException, SQLException {
boolean foundResource = false;
Pipeline pipeline = Pipeline.create(options);
JdbcConnectionUtil jdbcConnectionUtil = createJdbcConnection(options, dbConfig);
DataSource jdbcSource = createJdbcPooledDataSource(options, dbConfig);

JdbcFetchHapi jdbcFetchHapi = new JdbcFetchHapi(jdbcConnectionUtil);
JdbcFetchHapi jdbcFetchHapi = new JdbcFetchHapi(jdbcSource);
Map<String, Integer> resourceCount =
jdbcFetchHapi.searchResourceCounts(options.getResourceList(), options.getSince());

Expand All @@ -314,15 +313,15 @@ static Pipeline buildHapiJdbcFetch(
pipeline.apply(
"Generate query parameters for " + resourceType,
Create.of(
new JdbcFetchHapi(jdbcConnectionUtil)
new JdbcFetchHapi(jdbcSource)
.generateQueryParameters(options, resourceType, numResources)));

PCollection<HapiRowDescriptor> payload =
queryParameters.apply(
"JdbcIO fetch for " + resourceType,
new JdbcFetchHapi.FetchRowsJdbcIo(
options.getResourceList(),
JdbcIO.DataSourceConfiguration.create(jdbcConnectionUtil.getDataSource()),
JdbcIO.DataSourceConfiguration.create(jdbcSource),
options.getSince()));

payload.apply(
Expand Down Expand Up @@ -375,7 +374,7 @@ public static void main(String[] args)
// TODO: Check if we can use some sort of dependency-injection (e.g., `@Autowired`).
FhirContext fhirContext = FhirContexts.forR4();

if (!options.getSinkDbUrl().isEmpty()) {
if (!options.getSinkDbConfigPath().isEmpty()) {
JdbcResourceWriter.createTables(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ public interface FhirEtlOptions extends PipelineOptions {

void setOutputParquetPath(String value);

@Description("JDBC driver class")
@Default.String("com.mysql.cj.jdbc.Driver")
String getJdbcDriverClass();

void setJdbcDriverClass(String value);

@Description("JDBC maximum pool size")
@Default.Integer(50)
int getJdbcMaxPoolSize();
Expand Down Expand Up @@ -186,32 +180,19 @@ public interface FhirEtlOptions extends PipelineOptions {

void setSince(String value);

// TODO: Consolidate these options with source DB config that we read from file; in general
// it would be nice to have a file based approach for configuring pipeline options.
@Description("If set, it is the JDBC URL of the sink database.")
// NOTE: Sink DB options are experimental.
@Description("Path to the sink database config; if not set, no sink DB is used [experimental].")
@Default.String("")
String getSinkDbUrl();
String getSinkDbConfigPath();

void setSinkDbUrl(String value);
void setSinkDbConfigPath(String value);

@Description("The name prefix for the sink DB tables.")
@Default.String("")
String getSinkDbTablePrefix();

void setSinkDbTablePrefix(String value);

@Description("The username for JDBC sink connection.")
@Default.String("")
String getSinkDbUsername();

void setSinkDbUsername(String value);

@Description("The password for the JDBC sink connection.")
@Default.String("")
String getSinkDbPassword();

void setSinkDbPassword(String value);

@Description(
"If enabled all json resources are stored in the same table; by default a separate "
+ "table is created for each resource type.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand All @@ -52,10 +53,10 @@ public class JdbcFetchHapi {

private static final Logger log = LoggerFactory.getLogger(JdbcFetchHapi.class);

private JdbcConnectionUtil jdbcConnectionUtil;
private DataSource jdbcSource;

JdbcFetchHapi(JdbcConnectionUtil jdbcConnectionUtil) {
this.jdbcConnectionUtil = jdbcConnectionUtil;
JdbcFetchHapi(DataSource jdbcSource) {
this.jdbcSource = jdbcSource;
}

/**
Expand Down Expand Up @@ -334,7 +335,7 @@ public Map<String, Integer> searchResourceCounts(String resourceList, String sin
} else { // incremental mode
builder.append(" AND res.res_updated > '").append(since).append("'");
}
try (Connection connection = jdbcConnectionUtil.getDataSource().getConnection();
try (Connection connection = jdbcSource.getConnection();
PreparedStatement statement =
createPreparedStatement(connection, builder.toString(), resourceType);
ResultSet resultSet = statement.executeQuery()) {
Expand Down
Loading

0 comments on commit 0141fd2

Please sign in to comment.