diff --git a/docker/config/hapi-postgres-config_local.json b/docker/config/hapi-postgres-config_local.json index c85f4727c..743efc6bb 100644 --- a/docker/config/hapi-postgres-config_local.json +++ b/docker/config/hapi-postgres-config_local.json @@ -1,4 +1,5 @@ { + "jdbcDriverClass": "org.postgresql.Driver", "databaseService" : "postgresql", "databaseHostName" : "hapi-fhir-db", "databasePort" : "5432", diff --git a/pipelines/batch/Dockerfile b/pipelines/batch/Dockerfile index 6c9cb9674..6985d21dd 100644 --- a/pipelines/batch/Dockerfile +++ b/pipelines/batch/Dockerfile @@ -39,7 +39,6 @@ ENV BATCH_SIZE=10 ENV TARGET_PARALLELISM=10 ENV PARQUET_PATH="/tmp/" ENV JDBC_MODE_ENABLED=false -ENV JDBC_DRIVER_CLASS="com.mysql.cj.jdbc.Driver" ENV JDBC_FETCH_SIZE=10000 ENV JDBC_MAX_POOL_SIZE=50 ENV JDBC_INITIAL_POOL_SIZE=10 @@ -53,7 +52,7 @@ ENTRYPOINT java -jar /usr/src/Main/app.jar \ --fhirServerUrl=${FHIR_SERVER_URL} --fhirServerUserName=${FHIR_SERVER_USERNAME} --fhirServerPassword=${FHIR_SERVER_PASSWORD} \ --resourceList=${RESOURCE_LIST} --batchSize=${BATCH_SIZE} --targetParallelism=${TARGET_PARALLELISM} \ --fhirSinkPath=${SINK_PATH} --sinkUserName=${SINK_USERNAME} --sinkPassword=${SINK_PASSWORD} \ - --outputParquetPath=${PARQUET_PATH} --jdbcModeEnabled=${JDBC_MODE_ENABLED} --jdbcDriverClass=${JDBC_DRIVER_CLASS} \ + --outputParquetPath=${PARQUET_PATH} --jdbcModeEnabled=${JDBC_MODE_ENABLED} \ --jdbcMaxPoolSize=${JDBC_MAX_POOL_SIZE} \ --fhirDatabaseConfigPath=${FHIR_DATABASE_CONFIG_PATH} --jdbcInitialPoolSize=${JDBC_INITIAL_POOL_SIZE} \ --jdbcFetchSize=${JDBC_FETCH_SIZE} --jdbcModeHapi=${JDBC_MODE_HAPI} --runner=${RUNNER} diff --git a/pipelines/batch/pom.xml b/pipelines/batch/pom.xml index 694fcff03..fcb13b8f9 100644 --- a/pipelines/batch/pom.xml +++ b/pipelines/batch/pom.xml @@ -37,7 +37,6 @@ 2.2 2.14.2 v1-rev20220904-2.0.0 - 1.10.1 2.4.8 3.0.0 0.1 @@ -107,19 +106,6 @@ ${beam.version} - - - com.google.auto.value - auto-value-annotations - ${auto-value.version} - - - com.google.auto.value - auto-value - ${auto-value.version} - true - - diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java index 264e19039..3fe64cc34 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java @@ -136,7 +136,7 @@ public void writeResource(HapiRowDescriptor element) fhirStoreUtil.uploadResource(resource); totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); } - if (!this.sinkDbUrl.isEmpty()) { + if (this.sinkDbConfig != null) { // TODO : Remove the deleted resources from the sink database // https://github.com/google/fhir-data-pipes/issues/588 jdbcWriter.writeResource(resource); diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/org/openmrs/analytics/FetchSearchPageFn.java index 4cd0859a5..8524548d2 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/FetchSearchPageFn.java @@ -33,6 +33,8 @@ import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent; import org.hl7.fhir.r4.model.Resource; +import org.openmrs.analytics.JdbcConnectionPools.DataSourceConfig; +import org.openmrs.analytics.model.DatabaseConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,18 +79,12 @@ abstract class FetchSearchPageFn extends DoFn> { private final int rowGroupSize; - protected final String sinkDbUrl; - - private final String sinkDbUsername; - - private final String sinkDbPassword; + protected final DataSourceConfig sinkDbConfig; private final String sinkDbTableName; private final boolean useSingleSinkDbTable; - private final String jdbcDriverClass; - private final int initialPoolSize; private final int maxPoolSize; @@ -107,26 +103,6 @@ abstract class FetchSearchPageFn extends DoFn> { protected FhirContext fhirContext; - private static JdbcConnectionUtil jdbcConnectionUtil = null; - - // This is to enforce the Singleton pattern for JdbcConnectionUtil used by all workers running - // this DoFn on the same VM; hence sharing connections. - private static synchronized JdbcConnectionUtil getJdbcConnectionUtil( - String jdbcDriverClass, - String jdbcUrl, - String dbUser, - String dbPassword, - int initialPoolSize, - int jdbcMaxPoolSize) - throws PropertyVetoException { - if (jdbcConnectionUtil == null) { - jdbcConnectionUtil = - new JdbcConnectionUtil( - jdbcDriverClass, jdbcUrl, dbUser, dbPassword, initialPoolSize, jdbcMaxPoolSize); - } - return jdbcConnectionUtil; - } - FetchSearchPageFn(FhirEtlOptions options, String stageIdentifier) { this.sinkPath = options.getFhirSinkPath(); this.sinkUsername = options.getSinkUserName(); @@ -138,15 +114,23 @@ private static synchronized JdbcConnectionUtil getJdbcConnectionUtil( this.parquetFile = options.getOutputParquetPath(); this.secondsToFlush = options.getSecondsToFlushParquetFiles(); this.rowGroupSize = options.getRowGroupSizeForParquetFiles(); - this.sinkDbUrl = options.getSinkDbUrl(); this.sinkDbTableName = options.getSinkDbTablePrefix(); - this.sinkDbUsername = options.getSinkDbUsername(); - this.sinkDbPassword = options.getSinkDbPassword(); + if (options.getSinkDbConfigPath().isEmpty()) { + this.sinkDbConfig = null; + } else { + try { + this.sinkDbConfig = + JdbcConnectionPools.dbConfigToDataSourceConfig( + DatabaseConfiguration.createConfigFromFile(options.getSinkDbConfigPath())); + } catch (IOException e) { + String error = "Cannot access file " + options.getSinkDbConfigPath(); + log.error(error); + throw new IllegalArgumentException(error); + } + } this.useSingleSinkDbTable = options.getUseSingleSinkTable(); this.initialPoolSize = options.getJdbcInitialPoolSize(); this.maxPoolSize = options.getJdbcMaxPoolSize(); - // We are assuming that the potential source and sink DBs are the same type. - this.jdbcDriverClass = options.getJdbcDriverClass(); this.numFetchedResources = Metrics.counter( MetricsConstants.METRICS_NAMESPACE, @@ -196,18 +180,12 @@ public void setup() throws SQLException, PropertyVetoException { secondsToFlush, rowGroupSize, stageIdentifier + "_"); - if (!sinkDbUrl.isEmpty()) { - DataSource dataSource = - getJdbcConnectionUtil( - jdbcDriverClass, - sinkDbUrl, - sinkDbUsername, - sinkDbPassword, - initialPoolSize, - maxPoolSize) - .getDataSource(); + if (sinkDbConfig != null) { + DataSource jdbcSink = + JdbcConnectionPools.getInstance() + .getPooledDataSource(sinkDbConfig, initialPoolSize, maxPoolSize); jdbcWriter = - new JdbcResourceWriter(dataSource, sinkDbTableName, useSingleSinkDbTable, fhirContext); + new JdbcResourceWriter(jdbcSink, sinkDbTableName, useSingleSinkDbTable, fhirContext); } } @@ -239,7 +217,7 @@ protected void processBundle(Bundle bundle, @Nullable Set resourceTypes) fhirStoreUtil.uploadBundle(bundle); totalPushTimeMillis.inc(System.currentTimeMillis() - pushStartTime); } - if (!this.sinkDbUrl.isEmpty()) { + if (sinkDbConfig != null) { if (bundle.getEntry() == null) { return; } diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtl.java b/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtl.java index 1adf44225..3b4e6a770 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtl.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtl.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.sql.DataSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -157,15 +158,13 @@ static void runFhirFetch(FhirEtlOptions options, FhirContext fhirContext) throws EtlUtils.runPipelineWithTimestamp(pipeline, options); } - private static JdbcConnectionUtil createJdbcConnection( + private static DataSource createJdbcPooledDataSource( FhirEtlOptions options, DatabaseConfiguration dbConfig) throws PropertyVetoException { - return new JdbcConnectionUtil( - options.getJdbcDriverClass(), - dbConfig.makeJdbsUrlFromConfig(), - dbConfig.getDatabaseUser(), - dbConfig.getDatabasePassword(), - options.getJdbcInitialPoolSize(), - options.getJdbcMaxPoolSize()); + return JdbcConnectionPools.getInstance() + .getPooledDataSource( + JdbcConnectionPools.dbConfigToDataSourceConfig(dbConfig), + options.getJdbcInitialPoolSize(), + options.getJdbcMaxPoolSize()); } static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) @@ -174,8 +173,8 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) Pipeline pipeline = Pipeline.create(options); DatabaseConfiguration dbConfig = DatabaseConfiguration.createConfigFromFile(options.getFhirDatabaseConfigPath()); - JdbcConnectionUtil jdbcConnectionUtil = createJdbcConnection(options, dbConfig); - JdbcFetchOpenMrs jdbcUtil = new JdbcFetchOpenMrs(jdbcConnectionUtil); + DataSource jdbcSource = createJdbcPooledDataSource(options, dbConfig); + JdbcFetchOpenMrs jdbcUtil = new JdbcFetchOpenMrs(jdbcSource); int batchSize = Math.min( options.getBatchSize(), 170); // batch size > 200 will result in HTTP 400 Bad Request @@ -220,7 +219,7 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) } private static void validateOptions(FhirEtlOptions options) - throws SQLException, PropertyVetoException { + throws SQLException, PropertyVetoException, IOException { if (!options.getActivePeriod().isEmpty()) { Set resourceSet = Sets.newHashSet(options.getResourceList().split(",")); if (resourceSet.contains("Patient")) { @@ -256,7 +255,7 @@ private static void validateOptions(FhirEtlOptions options) } } - if (!options.getSinkDbUrl().isEmpty()) { + if (!options.getSinkDbConfigPath().isEmpty()) { JdbcResourceWriter.createTables(options); } } @@ -286,9 +285,9 @@ static Pipeline buildHapiJdbcFetch( throws PropertyVetoException, SQLException { boolean foundResource = false; Pipeline pipeline = Pipeline.create(options); - JdbcConnectionUtil jdbcConnectionUtil = createJdbcConnection(options, dbConfig); + DataSource jdbcSource = createJdbcPooledDataSource(options, dbConfig); - JdbcFetchHapi jdbcFetchHapi = new JdbcFetchHapi(jdbcConnectionUtil); + JdbcFetchHapi jdbcFetchHapi = new JdbcFetchHapi(jdbcSource); Map resourceCount = jdbcFetchHapi.searchResourceCounts(options.getResourceList(), options.getSince()); @@ -314,7 +313,7 @@ static Pipeline buildHapiJdbcFetch( pipeline.apply( "Generate query parameters for " + resourceType, Create.of( - new JdbcFetchHapi(jdbcConnectionUtil) + new JdbcFetchHapi(jdbcSource) .generateQueryParameters(options, resourceType, numResources))); PCollection payload = @@ -322,7 +321,7 @@ static Pipeline buildHapiJdbcFetch( "JdbcIO fetch for " + resourceType, new JdbcFetchHapi.FetchRowsJdbcIo( options.getResourceList(), - JdbcIO.DataSourceConfiguration.create(jdbcConnectionUtil.getDataSource()), + JdbcIO.DataSourceConfiguration.create(jdbcSource), options.getSince())); payload.apply( @@ -375,7 +374,7 @@ public static void main(String[] args) // TODO: Check if we can use some sort of dependency-injection (e.g., `@Autowired`). FhirContext fhirContext = FhirContexts.forR4(); - if (!options.getSinkDbUrl().isEmpty()) { + if (!options.getSinkDbConfigPath().isEmpty()) { JdbcResourceWriter.createTables(options); } diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtlOptions.java index 410a246e5..64322ef48 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtlOptions.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/FhirEtlOptions.java @@ -96,12 +96,6 @@ public interface FhirEtlOptions extends PipelineOptions { void setOutputParquetPath(String value); - @Description("JDBC driver class") - @Default.String("com.mysql.cj.jdbc.Driver") - String getJdbcDriverClass(); - - void setJdbcDriverClass(String value); - @Description("JDBC maximum pool size") @Default.Integer(50) int getJdbcMaxPoolSize(); @@ -186,13 +180,12 @@ public interface FhirEtlOptions extends PipelineOptions { void setSince(String value); - // TODO: Consolidate these options with source DB config that we read from file; in general - // it would be nice to have a file based approach for configuring pipeline options. - @Description("If set, it is the JDBC URL of the sink database.") + // NOTE: Sink DB options are experimental. + @Description("Path to the sink database config; if not set, no sink DB is used [experimental].") @Default.String("") - String getSinkDbUrl(); + String getSinkDbConfigPath(); - void setSinkDbUrl(String value); + void setSinkDbConfigPath(String value); @Description("The name prefix for the sink DB tables.") @Default.String("") @@ -200,18 +193,6 @@ public interface FhirEtlOptions extends PipelineOptions { void setSinkDbTablePrefix(String value); - @Description("The username for JDBC sink connection.") - @Default.String("") - String getSinkDbUsername(); - - void setSinkDbUsername(String value); - - @Description("The password for the JDBC sink connection.") - @Default.String("") - String getSinkDbPassword(); - - void setSinkDbPassword(String value); - @Description( "If enabled all json resources are stored in the same table; by default a separate " + "table is created for each resource type.") diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchHapi.java b/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchHapi.java index b04ddb58d..d755c4894 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchHapi.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchHapi.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.sql.DataSource; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -52,10 +53,10 @@ public class JdbcFetchHapi { private static final Logger log = LoggerFactory.getLogger(JdbcFetchHapi.class); - private JdbcConnectionUtil jdbcConnectionUtil; + private DataSource jdbcSource; - JdbcFetchHapi(JdbcConnectionUtil jdbcConnectionUtil) { - this.jdbcConnectionUtil = jdbcConnectionUtil; + JdbcFetchHapi(DataSource jdbcSource) { + this.jdbcSource = jdbcSource; } /** @@ -334,7 +335,7 @@ public Map searchResourceCounts(String resourceList, String sin } else { // incremental mode builder.append(" AND res.res_updated > '").append(since).append("'"); } - try (Connection connection = jdbcConnectionUtil.getDataSource().getConnection(); + try (Connection connection = jdbcSource.getConnection(); PreparedStatement statement = createPreparedStatement(connection, builder.toString(), resourceType); ResultSet resultSet = statement.executeQuery()) { diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchOpenMrs.java b/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchOpenMrs.java index 44fdb40f3..892e7adb4 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchOpenMrs.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcFetchOpenMrs.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Google LLC + * Copyright 2020-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.sql.DataSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.NullableCoder; @@ -47,10 +49,10 @@ public class JdbcFetchOpenMrs { private static final Logger log = LoggerFactory.getLogger(JdbcFetchOpenMrs.class); - private JdbcConnectionUtil jdbcConnectionUtil; + private DataSource jdbcSource; - JdbcFetchOpenMrs(JdbcConnectionUtil jdbcConnectionUtil) { - this.jdbcConnectionUtil = jdbcConnectionUtil; + JdbcFetchOpenMrs(DataSource jdbcSource) { + this.jdbcSource = jdbcSource; } public static class FetchUuids @@ -181,16 +183,16 @@ public PCollection fetchUuidsByDate( constraint = String.format(" WHERE %s ", upperConstraint); } } - Statement statement = jdbcConnectionUtil.createStatement(); - ResultSet resultSet; final String query = String.format("SELECT uuid FROM %s %s", tableName, constraint); log.info("SQL query: " + query); - resultSet = statement.executeQuery(query); List uuids = Lists.newArrayList(); - while (resultSet.next()) { - uuids.add(resultSet.getString("uuid")); + try (Connection connection = jdbcSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + while (resultSet.next()) { + uuids.add(resultSet.getString("uuid")); + } } - jdbcConnectionUtil.closeConnection(statement); log.info( String.format( "Will fetch %d rows matching activePeriod in table %s", uuids.size(), tableName)); @@ -214,19 +216,19 @@ public PCollection fetchAllUuids(Pipeline pipeline, String tableName, in private Integer fetchMaxId(String tableName) throws SQLException { String tableId = tableName + "_id"; - Statement statement = jdbcConnectionUtil.createStatement(); - ResultSet resultSet = - statement.executeQuery( - String.format("SELECT MAX(`%s`) as max_id FROM %s", tableId, tableName)); - resultSet.first(); - int maxId = resultSet.getInt("max_id"); - jdbcConnectionUtil.closeConnection(statement); - return maxId; + String query = String.format("SELECT MAX(`%s`) as max_id FROM %s", tableId, tableName); + try (Connection connection = jdbcSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + resultSet.first(); + int maxId = resultSet.getInt("max_id"); + return maxId; + } } @VisibleForTesting JdbcIO.DataSourceConfiguration getJdbcConfig() { - return JdbcIO.DataSourceConfiguration.create(this.jdbcConnectionUtil.getDataSource()); + return JdbcIO.DataSourceConfiguration.create(this.jdbcSource); } @VisibleForTesting diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcResourceWriter.java b/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcResourceWriter.java index fb590433f..c52459329 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcResourceWriter.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/JdbcResourceWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Google LLC + * Copyright 2020-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,16 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import java.beans.PropertyVetoException; +import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import javax.sql.DataSource; import org.hl7.fhir.r4.model.Resource; +import org.openmrs.analytics.JdbcConnectionPools.DataSourceConfig; +import org.openmrs.analytics.model.DatabaseConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,24 +68,23 @@ private static void createSingleTable(DataSource dataSource, String createStatem } } - static void createTables(FhirEtlOptions options) throws PropertyVetoException, SQLException { + static void createTables(FhirEtlOptions options) + throws PropertyVetoException, IOException, SQLException { // This should not be triggered in pipeline workers because concurrent CREATEs lead to failures: // https://stackoverflow.com/questions/54351783/duplicate-key-value-violates-unique-constraint - // + Preconditions.checkArgument(!Strings.nullToEmpty(options.getSinkDbConfigPath()).isEmpty()); Preconditions.checkArgument( !options.getSinkDbTablePrefix().isEmpty() || !options.getUseSingleSinkTable()); + DataSourceConfig dbConfig = + JdbcConnectionPools.dbConfigToDataSourceConfig( + DatabaseConfiguration.createConfigFromFile(options.getSinkDbConfigPath())); + DataSource jdbcSource = + JdbcConnectionPools.getInstance() + .getPooledDataSource( + dbConfig, options.getJdbcInitialPoolSize(), options.getJdbcMaxPoolSize()); log.info( String.format( - "Connecting to DB url %s with user %s.", - options.getSinkDbUrl(), options.getSinkDbUsername())); - JdbcConnectionUtil connectionUtil = - new JdbcConnectionUtil( - options.getJdbcDriverClass(), - options.getSinkDbUrl(), - options.getSinkDbPassword(), - options.getSinkDbPassword(), - options.getJdbcInitialPoolSize(), - options.getJdbcMaxPoolSize()); + "Connecting to DB url %s with user %s.", dbConfig.jdbcUrl(), dbConfig.dbUser())); if (options.getUseSingleSinkTable()) { // For CREATE statements we cannot (and don't need to) use a placeholder for table name, i.e., // we don't need to use PreparedStatement with '?'. @@ -89,7 +92,7 @@ static void createTables(FhirEtlOptions options) throws PropertyVetoException, S "CREATE TABLE IF NOT EXISTS %s (id VARCHAR(100) NOT NULL, " + "type VARCHAR(50) NOT NULL, datab JSONB, PRIMARY KEY (id, type) );"; String createStatement = String.format(tableCreate, options.getSinkDbTablePrefix()); - createSingleTable(connectionUtil.getDataSource(), createStatement); + createSingleTable(jdbcSource, createStatement); } else { for (String resourceType : options.getResourceList().split(",")) { String tableCreate = @@ -97,7 +100,7 @@ static void createTables(FhirEtlOptions options) throws PropertyVetoException, S + "datab JSONB, PRIMARY KEY (id) );"; String createStatement = String.format(tableCreate, options.getSinkDbTablePrefix() + resourceType); - createSingleTable(connectionUtil.getDataSource(), createStatement); + createSingleTable(jdbcSource, createStatement); } } } diff --git a/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java b/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java index 28347e3cb..82423c8dd 100644 --- a/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java +++ b/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import com.mchange.v2.c3p0.ComboPooledDataSource; import java.beans.PropertyVetoException; import java.io.IOException; import java.sql.Connection; @@ -27,6 +26,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; +import javax.sql.DataSource; import junit.framework.TestCase; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; @@ -44,6 +44,8 @@ public class JdbcFetchHapiTest extends TestCase { @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + @Mock private DataSource mockedDataSource; + @Mock private ResultSet resultSet; private FhirEtlOptions options; @@ -57,16 +59,7 @@ public void setup() throws IOException, PropertyVetoException { String[] args = {"--jdbcModeHapi=true", "--jdbcMaxPoolSize=48", "--jdbcFetchSize=10000"}; options = PipelineOptionsFactory.fromArgs(args).withValidation().as(FhirEtlOptions.class); dbConfig = DatabaseConfiguration.createConfigFromFile("../../utils/hapi-postgres-config.json"); - JdbcConnectionUtil jdbcConnectionUtil = - new JdbcConnectionUtil( - options.getJdbcDriverClass(), - dbConfig.makeJdbsUrlFromConfig(), - dbConfig.getDatabaseUser(), - dbConfig.getDatabasePassword(), - options.getJdbcInitialPoolSize(), - options.getJdbcMaxPoolSize()); - - jdbcFetchHapi = new JdbcFetchHapi(jdbcConnectionUtil); + jdbcFetchHapi = new JdbcFetchHapi(mockedDataSource); } @Test @@ -110,10 +103,6 @@ public void testSearchResourceCounts() throws SQLException { String resourceList = "Patient,Encounter,Observation"; String since = "2002-03-12T10:09:20.123456Z"; - JdbcConnectionUtil mockedJdbcConnectionUtil = Mockito.mock(JdbcConnectionUtil.class); - ComboPooledDataSource mockedDataSource = Mockito.mock(ComboPooledDataSource.class); - Mockito.when(mockedJdbcConnectionUtil.getDataSource()).thenReturn(mockedDataSource); - JdbcFetchHapi jdbcFetchHapi1 = new JdbcFetchHapi(mockedJdbcConnectionUtil); Connection mockedConnection = Mockito.mock(Connection.class); PreparedStatement mockedPreparedStatement = Mockito.mock(PreparedStatement.class); ResultSet mockedResultSet = Mockito.mock(ResultSet.class); @@ -128,8 +117,7 @@ public void testSearchResourceCounts() throws SQLException { + "'")) .thenReturn(mockedPreparedStatement); - Map resourceCountMap = - jdbcFetchHapi1.searchResourceCounts(resourceList, since); + Map resourceCountMap = jdbcFetchHapi.searchResourceCounts(resourceList, since); assertThat(resourceCountMap.get("Patient"), equalTo(100)); assertThat(resourceCountMap.get("Encounter"), equalTo(100)); diff --git a/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchOpenMrsTest.java b/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchOpenMrsTest.java index 3c129558e..8166a8311 100644 --- a/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchOpenMrsTest.java +++ b/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchOpenMrsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Google LLC + * Copyright 2020-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; @@ -26,6 +27,7 @@ import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -34,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.sql.DataSource; import junit.framework.TestCase; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.io.jdbc.JdbcIO; @@ -50,6 +53,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.openmrs.analytics.model.DatabaseConfiguration; @@ -70,6 +74,8 @@ public class JdbcFetchOpenMrsTest extends TestCase { private String basePath = "/tmp/JUNIT/Parquet/TEST/"; + private DataSource mockedDataSource; + private DatabaseConfiguration dbConfig; @Before @@ -88,17 +94,8 @@ public void setup() throws IOException, PropertyVetoException { dbConfig = DatabaseConfiguration.createConfigFromFile("../../utils/dbz_event_to_fhir_config.json"); - JdbcConnectionUtil jdbcConnectionUtil = - new JdbcConnectionUtil( - options.getJdbcDriverClass(), - dbConfig.makeJdbsUrlFromConfig(), - dbConfig.getDatabaseUser(), - dbConfig.getDatabasePassword(), - options.getJdbcInitialPoolSize(), - options.getJdbcMaxPoolSize()); - // TODO jdbcConnectionUtil should be replaced by a mocked JdbcConnectionUtil which does not - // depend on options either, since we don't need real DB connections for unit-testing. - jdbcFetchUtil = new JdbcFetchOpenMrs(jdbcConnectionUtil); + mockedDataSource = mock(DataSource.class, withSettings().serializable()); + jdbcFetchUtil = new JdbcFetchOpenMrs(mockedDataSource); parquetUtil = new ParquetUtil(fhirContext.getVersion().getVersion(), basePath); // clean up if folder exists File file = new File(basePath); @@ -179,12 +176,13 @@ public void testCreateFhirReverseMap() throws Exception { @Test public void testFetchAllUuidUtilonEmptyTable() throws SQLException, CannotProvideCoderException { JdbcFetchOpenMrs mockedJdbcFetchUtil = mock(JdbcFetchOpenMrs.class); - JdbcConnectionUtil mockedJdbcConnectionUtil = mock(JdbcConnectionUtil.class); Statement mockedStatement = mock(Statement.class); ResultSet mockedResultSet = mock(ResultSet.class); - mockedJdbcFetchUtil = new JdbcFetchOpenMrs(mockedJdbcConnectionUtil); + mockedJdbcFetchUtil = new JdbcFetchOpenMrs(mockedDataSource); - when(mockedJdbcConnectionUtil.createStatement()).thenReturn(mockedStatement); + Connection mockedConnection = Mockito.mock(Connection.class); + when(mockedDataSource.getConnection()).thenReturn(mockedConnection); + when(mockedConnection.createStatement()).thenReturn(mockedStatement); when(mockedStatement.executeQuery("SELECT MAX(`obs_id`) as max_id FROM obs")) .thenReturn(mockedResultSet); when(mockedResultSet.getInt("max_id")).thenReturn(0); diff --git a/pipelines/common/src/main/java/org/openmrs/analytics/JdbcConnectionPools.java b/pipelines/common/src/main/java/org/openmrs/analytics/JdbcConnectionPools.java new file mode 100644 index 000000000..d42d836fd --- /dev/null +++ b/pipelines/common/src/main/java/org/openmrs/analytics/JdbcConnectionPools.java @@ -0,0 +1,139 @@ +/* + * Copyright 2020-2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmrs.analytics; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; +import com.mchange.v2.c3p0.ComboPooledDataSource; +import java.beans.PropertyVetoException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.sql.DataSource; +import org.openmrs.analytics.model.DatabaseConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a Singleton that is intended for managing all JDBC connection pools. It guarantees that + * for each DB config, we only have one pool per JVM. + */ +public class JdbcConnectionPools { + private static final Logger log = LoggerFactory.getLogger(JdbcConnectionPools.class); + + private static JdbcConnectionPools instance = null; + + private final ConcurrentMap dataSources = new ConcurrentHashMap<>(); + + // This class should not be instantiated! + private JdbcConnectionPools() {} + + public static synchronized JdbcConnectionPools getInstance() { + if (instance == null) { + instance = new JdbcConnectionPools(); + } + return instance; + } + + /** + * Creates a new connection pool for the given config or return an already created one if one + * exists. The pool size parameters are just hints and won't be used if a pool already exists for + * the given configuration. This method is to impose a Singleton pattern for each config. + * + *

Note in the context of our Beam pipelines, we only serialize `DataSourceConfig` objects and + * create `DataSource` from them when necessary; i.e., do not serialize the returned `DataSource` + * in our code. Some Beam libraries like JdbcIO may serialize `DataSource` but they should + * properly handle the singleton pattern needed for pooled data-sources. + * + * @param config the JDBC connection information + * @param initialPoolSize initial pool size if a new pool is created. + * @param jdbcMaxPoolSize maximum pool size if a new pool is created. + * @return a pooling DataSource for the given DB config + */ + public DataSource getPooledDataSource( + DataSourceConfig config, int initialPoolSize, int jdbcMaxPoolSize) + throws PropertyVetoException { + dataSources.computeIfAbsent(config, c -> createNewPool(c, initialPoolSize, jdbcMaxPoolSize)); + return dataSources.get(config); + } + + private static DataSource createNewPool( + DataSourceConfig config, int initialPoolSize, int jdbcMaxPoolSize) { + log.info( + "Creating a JDBC connection pool for " + + config.jdbcUrl() + + " with driver class " + + config.jdbcDriverClass() + + " and max pool size " + + jdbcMaxPoolSize); + Preconditions.checkArgument( + initialPoolSize <= jdbcMaxPoolSize, + "initialPoolSize cannot be larger than jdbcMaxPoolSize"); + // Note caching of these connection-pools is important beyond just performance benefits. If a + // `ComboPooledDataSource` goes out of scope without calling `close()` on it, then it can leak + // connections (and memory) as its threads are not killed and can hold those objects; this was + // the case even with `setNumHelperThreads(0)`. + ComboPooledDataSource comboPooledDataSource = new ComboPooledDataSource(); + try { + comboPooledDataSource.setDriverClass(config.jdbcDriverClass()); + } catch (PropertyVetoException e) { + String errorMes = "Error in setting the JDBC driver class " + config.jdbcDriverClass(); + log.error(errorMes); + throw new IllegalArgumentException(errorMes, e); + } + comboPooledDataSource.setJdbcUrl(config.jdbcUrl()); + comboPooledDataSource.setUser(config.dbUser()); + comboPooledDataSource.setPassword(config.dbPassword()); + comboPooledDataSource.setMaxPoolSize(jdbcMaxPoolSize); + comboPooledDataSource.setInitialPoolSize(initialPoolSize); + // Setting an idle time to reduce the number of connections when idle. + comboPooledDataSource.setMaxIdleTime(30); + // Lowering the minimum pool size to limit the number of connections if multiple pools are + // created for the same DB. + comboPooledDataSource.setMinPoolSize(1); + return comboPooledDataSource; + } + + // TODO we should `close()` connection pools on application shutdown. + + public static DataSourceConfig dbConfigToDataSourceConfig(DatabaseConfiguration config) { + return DataSourceConfig.create( + config.getJdbcDriverClass(), + config.makeJdbsUrlFromConfig(), + config.getDatabaseUser(), + config.getDatabasePassword()); + } + + /** + * This is to identify a database connection pool. We use instances of these to cache connection + * pools and to impose a Singleton pattern per connection config (hence AutoValue). + */ + @AutoValue + public abstract static class DataSourceConfig { + abstract String jdbcDriverClass(); + + abstract String jdbcUrl(); + + abstract String dbUser(); + + abstract String dbPassword(); + + static DataSourceConfig create( + String jdbcDriverClass, String jdbcUrl, String dbUser, String dbPassword) { + return new AutoValue_JdbcConnectionPools_DataSourceConfig( + jdbcDriverClass, jdbcUrl, dbUser, dbPassword); + } + } +} diff --git a/pipelines/common/src/main/java/org/openmrs/analytics/JdbcConnectionUtil.java b/pipelines/common/src/main/java/org/openmrs/analytics/JdbcConnectionUtil.java deleted file mode 100644 index ed1cdf109..000000000 --- a/pipelines/common/src/main/java/org/openmrs/analytics/JdbcConnectionUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2020-2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.openmrs.analytics; - -import com.google.common.base.Preconditions; -import com.mchange.v2.c3p0.ComboPooledDataSource; -import java.beans.PropertyVetoException; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JdbcConnectionUtil { - private static final Logger log = LoggerFactory.getLogger(JdbcConnectionUtil.class); - - private final ComboPooledDataSource comboPooledDataSource; - - JdbcConnectionUtil( - String jdbcDriverClass, - String jdbcUrl, - String dbUser, - String dbPassword, - int initialPoolSize, - int jdbcMaxPoolSize) - throws PropertyVetoException { - log.info( - "Creating a JdbcConnectionUtil for " - + jdbcUrl - + " with driver class " - + jdbcDriverClass - + " and max pool size " - + jdbcMaxPoolSize); - Preconditions.checkArgument( - initialPoolSize <= jdbcMaxPoolSize, - "initialPoolSize cannot be larger than jdbcMaxPoolSize"); - comboPooledDataSource = new ComboPooledDataSource(); - comboPooledDataSource.setDriverClass(jdbcDriverClass); - comboPooledDataSource.setJdbcUrl(jdbcUrl); - comboPooledDataSource.setUser(dbUser); - comboPooledDataSource.setPassword(dbPassword); - comboPooledDataSource.setMaxPoolSize(jdbcMaxPoolSize); - comboPooledDataSource.setInitialPoolSize(initialPoolSize); - // Setting an idle time to reduce the number of connections when idle. - comboPooledDataSource.setMaxIdleTime(30); - } - - public Statement createStatement() throws SQLException { - Connection con = getDataSource().getConnection(); - return con.createStatement(); - } - - public void closeConnection(Statement stmt) throws SQLException { - if (stmt != null) { - Connection con = stmt.getConnection(); - stmt.close(); - con.close(); - } - } - - public ComboPooledDataSource getDataSource() { - return comboPooledDataSource; - } -} diff --git a/pipelines/common/src/main/java/org/openmrs/analytics/model/DatabaseConfiguration.java b/pipelines/common/src/main/java/org/openmrs/analytics/model/DatabaseConfiguration.java index faca8e50d..2e30c17a3 100644 --- a/pipelines/common/src/main/java/org/openmrs/analytics/model/DatabaseConfiguration.java +++ b/pipelines/common/src/main/java/org/openmrs/analytics/model/DatabaseConfiguration.java @@ -32,6 +32,7 @@ public class DatabaseConfiguration { // General configuration parameters that needs to be exposed beyond Debezium. + private String jdbcDriverClass; private String databaseService; private String databaseHostName; private String databasePort; diff --git a/pipelines/common/src/test/java/org/openmrs/analytics/JdbcConnectionUtilTest.java b/pipelines/common/src/test/java/org/openmrs/analytics/JdbcConnectionUtilTest.java deleted file mode 100644 index 9b25b3637..000000000 --- a/pipelines/common/src/test/java/org/openmrs/analytics/JdbcConnectionUtilTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2020-2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.openmrs.analytics; - -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.beans.PropertyVetoException; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class JdbcConnectionUtilTest { - - private Connection mockConnection; - - private Statement mockStatement; - - private JdbcConnectionUtil jdbcConnectionUtil; - - @Before - public void setup() throws PropertyVetoException, SQLException { - mockStatement = mock(Statement.class); - mockConnection = mock(Connection.class); - when(mockStatement.getConnection()).thenReturn(mockConnection); - jdbcConnectionUtil = new JdbcConnectionUtil("random", "random", "omar", "123", 3, 60); - } - - @Test - public void testSetIncorrectJdbcPoolSize() throws IllegalArgumentException { - IllegalArgumentException thrown = - Assert.assertThrows( - IllegalArgumentException.class, - () -> new JdbcConnectionUtil("random", "random", "omar", "123", 4, 2)); - - assertTrue( - thrown.getMessage().contains("initialPoolSize cannot be larger than jdbcMaxPoolSize")); - } - - @Test - public void testCloseConnection() throws PropertyVetoException, SQLException { - jdbcConnectionUtil.closeConnection(mockStatement); - verify(mockStatement, times(1)).close(); - } - - @Test - public void testCloseConnectionNullStatement() throws PropertyVetoException, SQLException { - jdbcConnectionUtil.closeConnection(null); - verify(mockStatement, times(0)).close(); - verify(mockConnection, times(0)).close(); - } -} diff --git a/pipelines/controller/config/hapi-postgres-config.json b/pipelines/controller/config/hapi-postgres-config.json index 489d2ba77..0f3224edb 100644 --- a/pipelines/controller/config/hapi-postgres-config.json +++ b/pipelines/controller/config/hapi-postgres-config.json @@ -1,4 +1,5 @@ { + "jdbcDriverClass": "org.postgresql.Driver", "databaseService" : "postgresql", "databaseHostName" : "172.17.0.1", "databasePort" : "5432", diff --git a/pipelines/pom.xml b/pipelines/pom.xml index 126865084..53a0c05bb 100644 --- a/pipelines/pom.xml +++ b/pipelines/pom.xml @@ -47,6 +47,7 @@ 1.82 4.2 2.45.0 + 1.10.1 @@ -140,6 +141,20 @@ org.mockito mockito-inline + + + + com.google.auto.value + auto-value-annotations + ${auto-value.version} + + + com.google.auto.value + auto-value + ${auto-value.version} + true + + diff --git a/pipelines/streaming/Dockerfile b/pipelines/streaming/Dockerfile index f8e66ad73..9f5e06da1 100644 --- a/pipelines/streaming/Dockerfile +++ b/pipelines/streaming/Dockerfile @@ -28,7 +28,6 @@ ENV FHIR_DEBEZIUM_CONFIG_PATH="/workspace/utils/dbz_event_to_fhir_config.json" ENV JDBC_FETCH_SIZE=10000 ENV JDBC_MAX_POOL_SIZE=50 ENV JDBC_INITIAL_POOL_SIZE=10 -ENV JDBC_DRIVER_CLASS="com.mysql.cj.jdbc.Driver" ENV SECONDS_TO_FLUSH_PARQUET_FILES=3600 ENTRYPOINT java -jar /usr/src/Main/app.jar \ @@ -40,7 +39,6 @@ ENTRYPOINT java -jar /usr/src/Main/app.jar \ --sinkPassword=${SINK_PASSWORD} \ --outputParquetPath=${PARQUET_PATH} \ --fhirDebeziumConfigPath=${FHIR_DEBEZIUM_CONFIG_PATH} \ - --jdbcDriverClass=${JDBC_DRIVER_CLASS} \ --jdbcMaxPoolSize=${JDBC_MAX_POOL_SIZE} \ --jdbcInitialPoolSize=${JDBC_INITIAL_POOL_SIZE} \ --secondsToFlushParquetFiles=${SECONDS_TO_FLUSH_PARQUET_FILES} diff --git a/pipelines/streaming/src/main/java/org/openmrs/analytics/DebeziumListener.java b/pipelines/streaming/src/main/java/org/openmrs/analytics/DebeziumListener.java index eebbe13de..0e3a20217 100644 --- a/pipelines/streaming/src/main/java/org/openmrs/analytics/DebeziumListener.java +++ b/pipelines/streaming/src/main/java/org/openmrs/analytics/DebeziumListener.java @@ -26,10 +26,12 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Map; +import javax.sql.DataSource; import org.apache.camel.CamelContext; import org.apache.camel.LoggingLevel; import org.apache.camel.Service; import org.apache.camel.builder.RouteBuilder; +import org.openmrs.analytics.JdbcConnectionPools.DataSourceConfig; import org.openmrs.analytics.model.DatabaseConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,15 +86,17 @@ FhirConverter createFhirConverter(CamelContext camelContext) throws Exception { params.secondsToFlushParquetFiles, params.rowGroupSizeForParquetFiles, "streaming_"); - JdbcConnectionUtil jdbcConnectionUtil = - new JdbcConnectionUtil( - params.jdbcDriverClass, - this.databaseConfiguration.makeJdbsUrlFromConfig(), - this.databaseConfiguration.getDatabaseUser(), - this.databaseConfiguration.getDatabasePassword(), - params.initialPoolSize, - params.jdbcMaxPoolSize); - UuidUtil uuidUtil = new UuidUtil(jdbcConnectionUtil); + DataSource jdbcSource = + JdbcConnectionPools.getInstance() + .getPooledDataSource( + DataSourceConfig.create( + databaseConfiguration.getJdbcDriverClass(), + databaseConfiguration.makeJdbsUrlFromConfig(), + databaseConfiguration.getDatabaseUser(), + databaseConfiguration.getDatabasePassword()), + params.initialPoolSize, + params.jdbcMaxPoolSize); + UuidUtil uuidUtil = new UuidUtil(jdbcSource); camelContext.addService(new ParquetService(parquetUtil), true); StatusServer statusServer = new StatusServer(params.statusPort); // TODO: Improve this `start` signal to make sure every resource after this time are fetched; @@ -179,11 +183,6 @@ public DebeziumArgs() {} description = "Google cloud FHIR store") public String fhirDebeziumConfigPath = "../utils/dbz_event_to_fhir_config.json"; - @Parameter( - names = {"--jdbcDriverClass"}, - description = "JDBC MySQL driver class") - public String jdbcDriverClass = "com.mysql.cj.jdbc.Driver"; - @Parameter( names = {"--jdbcMaxPoolSize"}, description = "JDBC maximum pool size") diff --git a/pipelines/streaming/src/main/java/org/openmrs/analytics/UuidUtil.java b/pipelines/streaming/src/main/java/org/openmrs/analytics/UuidUtil.java index 02478dddd..c7ed9318b 100644 --- a/pipelines/streaming/src/main/java/org/openmrs/analytics/UuidUtil.java +++ b/pipelines/streaming/src/main/java/org/openmrs/analytics/UuidUtil.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Google LLC + * Copyright 2020-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,42 +13,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.openmrs.analytics; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import javax.sql.DataSource; public class UuidUtil { - private final JdbcConnectionUtil jdbcConnectionUtil; - - private Statement stmt; + private final DataSource jdbcSource; - private ResultSet rs; - - public UuidUtil(JdbcConnectionUtil jdbcConnectionUtil) { - this.jdbcConnectionUtil = jdbcConnectionUtil; + public UuidUtil(DataSource jdbcSource) { + this.jdbcSource = jdbcSource; } public String getUuid(String table, String keyColumn, String keyValue) throws SQLException { - try { - stmt = jdbcConnectionUtil.createStatement(); + String sql = String.format("SELECT uuid FROM %s WHERE %s = %s", table, keyColumn, keyValue); + try (Connection connection = jdbcSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql)) { String uuidResultFromSql = null; - String sql = String.format("SELECT uuid FROM %s WHERE %s = %s", table, keyColumn, keyValue); - - rs = stmt.executeQuery(sql); - while (rs.next()) { - uuidResultFromSql = rs.getString("uuid"); + while (resultSet.next()) { + uuidResultFromSql = resultSet.getString("uuid"); } if (uuidResultFromSql == null) { throw new SQLException("Could not find the uuid in the DB"); } else { return uuidResultFromSql; } - } finally { - jdbcConnectionUtil.closeConnection(stmt); } } } diff --git a/pipelines/streaming/src/test/java/org/openmrs/analytics/UuidUtilTest.java b/pipelines/streaming/src/test/java/org/openmrs/analytics/UuidUtilTest.java index b78c5ecfc..b1bd594a6 100644 --- a/pipelines/streaming/src/test/java/org/openmrs/analytics/UuidUtilTest.java +++ b/pipelines/streaming/src/test/java/org/openmrs/analytics/UuidUtilTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Google LLC + * Copyright 2020-2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,14 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.openmrs.analytics; import static org.mockito.Mockito.when; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import javax.sql.DataSource; import junit.framework.TestCase; import org.junit.Before; import org.junit.Test; @@ -31,10 +32,9 @@ @RunWith(MockitoJUnitRunner.class) public class UuidUtilTest extends TestCase { - @Mock private JdbcConnectionUtil jdbcConnectionUtil; - + @Mock private DataSource jdbcSource; + @Mock private Connection connection; @Mock private Statement statement; - @Mock private ResultSet resultset; private String uuid; @@ -54,7 +54,8 @@ public void beforeTestCase() throws Exception { String sql = String.format("SELECT uuid FROM %s WHERE %s = %s", table, keyColumn, keyValue); - when(jdbcConnectionUtil.createStatement()).thenReturn(statement); + when(jdbcSource.getConnection()).thenReturn(connection); + when(connection.createStatement()).thenReturn(statement); when(statement.executeQuery(sql)).thenReturn(resultset); when(resultset.next()).thenReturn(true).thenReturn(false); when(resultset.getString("uuid")).thenReturn(uuid); @@ -63,7 +64,7 @@ public void beforeTestCase() throws Exception { @Test public void shouldReturnValidUuid() throws SQLException { - UuidUtil uuidUtil = new UuidUtil(jdbcConnectionUtil); + UuidUtil uuidUtil = new UuidUtil(jdbcSource); String uuid = uuidUtil.getUuid(table, keyColumn, keyValue); assertNotNull(uuid); diff --git a/utils/dbz_event_to_fhir_config.json b/utils/dbz_event_to_fhir_config.json index fd35303b3..1f4382d94 100644 --- a/utils/dbz_event_to_fhir_config.json +++ b/utils/dbz_event_to_fhir_config.json @@ -1,4 +1,5 @@ { + "jdbcDriverClass" : "com.mysql.cj.jdbc.Driver", "databaseService" : "mysql", "databaseHostName" : "openmrs-fhir-mysql", "databasePort" : "3306", diff --git a/utils/hapi-postgres-config.json b/utils/hapi-postgres-config.json index c85f4727c..a49126f41 100644 --- a/utils/hapi-postgres-config.json +++ b/utils/hapi-postgres-config.json @@ -1,4 +1,5 @@ { + "jdbcDriverClass" : "org.postgresql.Driver", "databaseService" : "postgresql", "databaseHostName" : "hapi-fhir-db", "databasePort" : "5432",