From 7d2febd63585bbc11202bd927543edba15f4b663 Mon Sep 17 00:00:00 2001 From: An Phi Date: Tue, 10 Oct 2023 22:34:57 -0400 Subject: [PATCH] conn-man: add test for connection pooling --- .../pom.xml | 13 +- .../test/TestJDBCConnectionManager.java | 197 +++++++++++------- .../legend/connection/IdentityFactory.java | 4 +- .../connection/IdentitySpecification.java | 2 +- .../connection/JDBCConnectionBuilder.java | 15 +- .../impl/JDBCConnectionManager.java | 81 ++++++- legend-engine-xts-relationalStore/pom.xml | 1 - 7 files changed, 218 insertions(+), 95 deletions(-) diff --git a/legend-engine-config/legend-engine-connection-integration-tests/pom.xml b/legend-engine-config/legend-engine-connection-integration-tests/pom.xml index cfb1f695291..3228943aa0c 100644 --- a/legend-engine-config/legend-engine-connection-integration-tests/pom.xml +++ b/legend-engine-config/legend-engine-connection-integration-tests/pom.xml @@ -15,7 +15,8 @@ limitations under the License. --> - + org.finos.legend.engine legend-engine-config @@ -74,11 +75,6 @@ - - com.zaxxer - HikariCP - - org.junit.jupiter @@ -90,6 +86,11 @@ junit-jupiter-engine test + + net.bytebuddy + byte-buddy + test + diff --git a/legend-engine-config/legend-engine-connection-integration-tests/src/test/java/org/finos/legend/engine/connection/test/TestJDBCConnectionManager.java b/legend-engine-config/legend-engine-connection-integration-tests/src/test/java/org/finos/legend/engine/connection/test/TestJDBCConnectionManager.java index c51a7585d76..1a5a2c5b680 100644 --- a/legend-engine-config/legend-engine-connection-integration-tests/src/test/java/org/finos/legend/engine/connection/test/TestJDBCConnectionManager.java +++ b/legend-engine-config/legend-engine-connection-integration-tests/src/test/java/org/finos/legend/engine/connection/test/TestJDBCConnectionManager.java @@ -14,9 +14,7 @@ package org.finos.legend.engine.connection.test; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariPoolMXBean; -import org.eclipse.collections.api.block.function.Function; +import net.bytebuddy.asm.Advice; import org.finos.legend.authentication.vault.impl.PropertiesFileCredentialVault; import org.finos.legend.connection.AuthenticationMechanismConfiguration; import org.finos.legend.connection.Authenticator; @@ -44,13 +42,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import javax.management.JMX; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.sql.Connection; import java.util.Properties; @@ -107,6 +98,8 @@ public void setup() public void cleanUp() { postgresContainer.stop(); + + JDBCConnectionManager.getInstance().flushPool(); } @Test @@ -117,14 +110,9 @@ public void testBasicConnectionPooling() throws Exception new UserPasswordCredentialBuilder() ) .withConnectionBuilders( - new InstrumentedStaticJDBCConnectionBuilder.WithPlaintextUsernamePassword((HikariConfig config) -> { - config.setKeepaliveTime(10000); - config.setMaximumPoolSize(10); - return null; - }) + new StaticJDBCConnectionBuilder.WithPlaintextUsernamePassword() ) .build(); - this.storeInstanceProvider.injectStoreInstance(this.storeInstance); Identity identity = identityFactory.createIdentity( new IdentitySpecification.Builder() @@ -136,91 +124,152 @@ public void testBasicConnectionPooling() throws Exception postgresContainer.getUser(), new PropertiesFileSecret("passwordRef") ); - Authenticator authenticator = this.connectionFactory.getAuthenticator(identity, TEST_STORE_INSTANCE_NAME, authenticationConfiguration); + JDBCConnectionManager connectionManager = JDBCConnectionManager.getInstance(); + Assertions.assertEquals(0, connectionManager.getPoolSize()); + // 1. Get a connection, this should initialize the pool as well as create a new connection in the empty pool // this connection should be active Connection connection0 = this.connectionFactory.getConnection(identity, authenticator); - HikariPoolMXBean poolProxy = getPoolProxy(identity, connectionSpecification, authenticationConfiguration); - Assertions.assertEquals(1, poolProxy.getTotalConnections()); - Assertions.assertEquals(1, poolProxy.getActiveConnections()); - Assertions.assertEquals(0, poolProxy.getIdleConnections()); + String poolName = JDBCConnectionManager.getPoolName(identity, connectionSpecification, authenticationConfiguration); + JDBCConnectionManager.ConnectionPool connectionPool = connectionManager.getPool(poolName); // 2. Close the connection, verify that the pool keeps this connection around in idle state + Connection underlyingConnection0 = connection0.unwrap(Connection.class); connection0.close(); - Assertions.assertEquals(1, poolProxy.getTotalConnections()); - Assertions.assertEquals(0, poolProxy.getActiveConnections()); - Assertions.assertEquals(1, poolProxy.getIdleConnections()); + Assertions.assertEquals(1, connectionPool.getTotalConnections()); + Assertions.assertEquals(0, connectionPool.getActiveConnections()); + Assertions.assertEquals(1, connectionPool.getIdleConnections()); // 3. Get a new connection, the pool should return the idle connection and create no new connection Connection connection1 = this.connectionFactory.getConnection(identity, authenticator); -// Connection connection2 = this.connectionFactory.getConnection(identity, authenticator); -// connection1.isValid() - Assertions.assertEquals(1, poolProxy.getTotalConnections()); - Assertions.assertEquals(1, poolProxy.getActiveConnections()); - Assertions.assertEquals(0, poolProxy.getIdleConnections()); + Assertions.assertEquals(underlyingConnection0, connection1.unwrap(Connection.class)); + Assertions.assertEquals(1, connectionPool.getTotalConnections()); + Assertions.assertEquals(1, connectionPool.getActiveConnections()); + Assertions.assertEquals(0, connectionPool.getIdleConnections()); // 4. Get another connection while the first one is still alive and used, a new connection // will be created in the pool this.connectionFactory.getConnection(identity, authenticator); - Assertions.assertEquals(2, poolProxy.getTotalConnections()); - Assertions.assertEquals(2, poolProxy.getActiveConnections()); - Assertions.assertEquals(0, poolProxy.getIdleConnections()); + Assertions.assertEquals(2, connectionPool.getTotalConnections()); + Assertions.assertEquals(2, connectionPool.getActiveConnections()); + Assertions.assertEquals(0, connectionPool.getIdleConnections()); } - private static HikariPoolMXBean getPoolProxy(Identity identity, ConnectionSpecification connectionSpecification, AuthenticationConfiguration authenticationConfiguration) throws MalformedObjectNameException + @Test + public void testConnectionPoolingForDifferentIdentities() throws Exception { - String poolName = JDBCConnectionManager.getPoolName(identity, connectionSpecification, authenticationConfiguration); - MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - return JMX.newMXBeanProxy(mBeanServer, new ObjectName("com.zaxxer.hikari:type=Pool (" + poolName + ")"), HikariPoolMXBean.class); - } + this.connectionFactory = new ConnectionFactory.Builder(this.environment, this.storeInstanceProvider) + .withCredentialBuilders( + new UserPasswordCredentialBuilder() + ) + .withConnectionBuilders( + new StaticJDBCConnectionBuilder.WithPlaintextUsernamePassword() + ) + .build(); + this.storeInstanceProvider.injectStoreInstance(this.storeInstance); + Identity identity1 = identityFactory.createIdentity( + new IdentitySpecification.Builder() + .withName("testUser1") + .build() + ); + Identity identity2 = identityFactory.createIdentity( + new IdentitySpecification.Builder() + .withName("testUser2") + .build() + ); + ConnectionSpecification connectionSpecification = this.storeInstance.getConnectionSpecification(); + AuthenticationConfiguration authenticationConfiguration = new UserPasswordAuthenticationConfiguration( + postgresContainer.getUser(), + new PropertiesFileSecret("passwordRef") + ); - private static class InstrumentedStaticJDBCConnectionBuilder - { - static class WithPlaintextUsernamePassword extends StaticJDBCConnectionBuilder.WithPlaintextUsernamePassword - { - private final InstrumentedJDBCConnectionManager connectionManager; - - WithPlaintextUsernamePassword(Function hikariConfigHandler) - { - this.connectionManager = new InstrumentedJDBCConnectionManager(hikariConfigHandler); - } - - @Override - public JDBCConnectionManager getConnectionManager() - { - return this.connectionManager; - } - - @Override - protected Type[] actualTypeArguments() - { - Type genericSuperClass = this.getClass().getSuperclass().getGenericSuperclass(); - ParameterizedType parameterizedType = (ParameterizedType) genericSuperClass; - return parameterizedType.getActualTypeArguments(); - } - } - } + JDBCConnectionManager connectionManager = JDBCConnectionManager.getInstance(); + Assertions.assertEquals(0, connectionManager.getPoolSize()); - private static class InstrumentedJDBCConnectionManager extends JDBCConnectionManager - { - private final Function hikariConfigHandler; + // 1. Get a new connection for identity1, which should initialize a pool + this.connectionFactory.getConnection(identity1, this.connectionFactory.getAuthenticator(identity1, TEST_STORE_INSTANCE_NAME, authenticationConfiguration)); - InstrumentedJDBCConnectionManager(Function hikariConfigHandler) - { - this.hikariConfigHandler = hikariConfigHandler; - } + String poolName1 = JDBCConnectionManager.getPoolName(identity1, connectionSpecification, authenticationConfiguration); + JDBCConnectionManager.ConnectionPool connectionPool1 = connectionManager.getPool(poolName1); + + Assertions.assertEquals(1, connectionManager.getPoolSize()); + Assertions.assertEquals(1, connectionPool1.getTotalConnections()); + Assertions.assertEquals(1, connectionPool1.getActiveConnections()); + Assertions.assertEquals(0, connectionPool1.getIdleConnections()); + + // 2. Get a new connection for identity2, which should initialize another pool + this.connectionFactory.getConnection(identity2, this.connectionFactory.getAuthenticator(identity2, TEST_STORE_INSTANCE_NAME, authenticationConfiguration)); - @Override - protected void handleHikariConfig(HikariConfig config) + String poolName2 = JDBCConnectionManager.getPoolName(identity2, connectionSpecification, authenticationConfiguration); + JDBCConnectionManager.ConnectionPool connectionPool2 = connectionManager.getPool(poolName2); + + Assertions.assertEquals(2, connectionManager.getPoolSize()); + Assertions.assertEquals(1, connectionPool2.getTotalConnections()); + Assertions.assertEquals(1, connectionPool2.getActiveConnections()); + Assertions.assertEquals(0, connectionPool2.getIdleConnections()); + } + + public static class CustomAdvice + { + @Advice.OnMethodExit + public static void intercept(@Advice.Return(readOnly = false) String value) { - config.setRegisterMbeans(true); - this.hikariConfigHandler.apply(config); + System.out.println("intercepted: " + value); + value = "hi: " + value; } } + +// public static class MyWay +// { +// } +// +// private static class InstrumentedStaticJDBCConnectionBuilder +// { +// static class WithPlaintextUsernamePassword extends StaticJDBCConnectionBuilder.WithPlaintextUsernamePassword +// { +// private final InstrumentedJDBCConnectionManager connectionManager; +// +// WithPlaintextUsernamePassword(Function hikariConfigHandler) +// { +// this.connectionManager = new InstrumentedJDBCConnectionManager(hikariConfigHandler); +// } +// +// @Override +// public JDBCConnectionManager getConnectionManager() +// { +// return this.connectionManager; +// } +// +// @Override +// protected Type[] actualTypeArguments() +// { +// Type genericSuperClass = this.getClass().getSuperclass().getGenericSuperclass(); +// ParameterizedType parameterizedType = (ParameterizedType) genericSuperClass; +// return parameterizedType.getActualTypeArguments(); +// } +// } +// } +// +// private static class InstrumentedJDBCConnectionManager extends JDBCConnectionManager +// { +// private final Function hikariConfigHandler; +// +// InstrumentedJDBCConnectionManager(Function hikariConfigHandler) +// { +// this.hikariConfigHandler = hikariConfigHandler; +// } +// +//// @Override +//// protected void handleHikariConfig(HikariConfig config) +//// { +//// config.setRegisterMbeans(true); +//// this.hikariConfigHandler.apply(config); +//// } +// } } diff --git a/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentityFactory.java b/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentityFactory.java index af245f96647..a02600a70a6 100644 --- a/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentityFactory.java +++ b/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentityFactory.java @@ -17,9 +17,9 @@ import org.eclipse.collections.api.factory.Lists; import org.finos.legend.engine.shared.core.identity.Credential; import org.finos.legend.engine.shared.core.identity.Identity; +import org.finos.legend.engine.shared.core.identity.credential.AnonymousCredential; import org.finos.legend.engine.shared.core.identity.factory.DefaultIdentityFactory; -import javax.security.auth.Subject; import java.util.List; public class IdentityFactory @@ -49,7 +49,7 @@ public Identity createIdentity(IdentitySpecification identitySpecification) } if (credentials.isEmpty()) { - return DEFAULT.makeUnknownIdentity(); + return identitySpecification.getName() != null ? new Identity(identitySpecification.getName(), new AnonymousCredential()) : DEFAULT.makeUnknownIdentity(); } return new Identity(identitySpecification.getName(), credentials); } diff --git a/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentitySpecification.java b/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentitySpecification.java index 34e7f18c207..7a07c2f6989 100644 --- a/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentitySpecification.java +++ b/legend-engine-xts-authentication/legend-engine-xt-authentication-connection-factory/src/main/java/org/finos/legend/connection/IdentitySpecification.java @@ -31,7 +31,7 @@ public class IdentitySpecification private IdentitySpecification(String name, List profiles, Subject subject, List credentials) { - this.name = Objects.requireNonNull(name, "Name is missing"); + this.name = name; this.profiles = profiles; this.subject = subject; this.credentials = credentials; diff --git a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/JDBCConnectionBuilder.java b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/JDBCConnectionBuilder.java index 0598a6e13e0..3fd8192dad7 100644 --- a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/JDBCConnectionBuilder.java +++ b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/JDBCConnectionBuilder.java @@ -1,6 +1,19 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.finos.legend.connection; -import org.finos.legend.connection.ConnectionBuilder; import org.finos.legend.connection.impl.JDBCConnectionManager; import org.finos.legend.connection.protocol.ConnectionSpecification; import org.finos.legend.engine.shared.core.identity.Credential; diff --git a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/impl/JDBCConnectionManager.java b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/impl/JDBCConnectionManager.java index 67a206aa5fc..91ab34fca98 100644 --- a/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/impl/JDBCConnectionManager.java +++ b/legend-engine-xts-relationalStore/legend-engine-xt-relationalStore-connection/src/main/java/org/finos/legend/connection/impl/JDBCConnectionManager.java @@ -16,6 +16,8 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import org.eclipse.collections.api.block.function.Function0; +import org.eclipse.collections.api.map.ConcurrentMutableMap; import org.eclipse.collections.impl.map.mutable.ConcurrentHashMap; import org.finos.legend.connection.Authenticator; import org.finos.legend.connection.ConnectionManager; @@ -39,6 +41,7 @@ import java.util.ServiceLoader; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.function.Supplier; import java.util.logging.Logger; public class JDBCConnectionManager implements ConnectionManager @@ -47,6 +50,7 @@ public class JDBCConnectionManager implements ConnectionManager private static final AtomicBoolean isInitialized = new AtomicBoolean(); private static JDBCConnectionManager INSTANCE; + private final ConcurrentMutableMap poolIndex = ConcurrentHashMap.newMap(); protected JDBCConnectionManager() { @@ -87,7 +91,57 @@ public void initialize(LegendEnvironment environment) JDBCConnectionManager.setup(); } - public Connection getConnection( + protected Connection getConnection(Database database, + String host, + int port, + String databaseName, + Properties connectionProperties, + Function authenticationPropertiesSupplier, + Authenticator authenticator, + Identity identity + ) throws SQLException + { + StoreInstance storeInstance = authenticator.getStoreInstance(); + ConnectionSpecification connectionSpecification = storeInstance.getConnectionSpecification(); + AuthenticationConfiguration authenticationConfiguration = authenticator.getAuthenticationConfiguration(); + String poolName = getPoolName(identity, connectionSpecification, authenticationConfiguration); + + // TODO: @akphi - this is simplistic, we need to handle concurrency and errors + Supplier dataSourceSupplier = () -> this.buildDataSource(database, host, port, databaseName, connectionProperties, authenticationPropertiesSupplier, authenticator, identity); + Function0 connectionPoolSupplier = () -> new ConnectionPool(dataSourceSupplier.get()); + ConnectionPool connectionPool = this.poolIndex.getIfAbsentPut(poolName, connectionPoolSupplier); + + return connectionPool.dataSource.getConnection(); + +// try (Scope scope = GlobalTracer.get().buildSpan("Get Connection").startActive(true)) +// { +// ConnectionKey connectionKey = this.getConnectionKey(); +// // Logs and traces ----- +// String principal = identityState.getIdentity().getName(); +// scope.span().setTag("Principal", principal); +// scope.span().setTag("DataSourceSpecification", this.toString()); +// LOGGER.info("Get Connection as [{}] for datasource [{}]", principal, connectionKey.shortId()); +// // --------------------- +// try +// { +// DataSourceWithStatistics dataSourceWithStatistics = this.connectionStateManager.getDataSourceForIdentityIfAbsentBuild(identityState, this, dataSourcePoolBuilder); +// // Logs and traces and stats ----- +// String poolName = dataSourceWithStatistics.getPoolName(); +// scope.span().setTag("Pool", poolName); +// int requests = dataSourceWithStatistics.requestConnection(); +// LOGGER.info("Principal [{}] has requested [{}] connections for pool [{}]", principal, requests, poolName); +// return authenticationStrategy.getConnection(dataSourceWithStatistics, identityState.getIdentity()); +// } +// catch (ConnectionException ce) +// { +// LOGGER.error("ConnectionException {{}} : pool stats [{}] ", principal, connectionStateManager.getPoolStatisticsAsJSON(poolNameFor(identityState.getIdentity()))); +// LOGGER.error("ConnectionException ", ce); +// throw ce; +// } +// } + } + + protected HikariDataSource buildDataSource( Database database, String host, int port, @@ -96,7 +150,7 @@ public Connection getConnection( Function authenticationPropertiesSupplier, Authenticator authenticator, Identity identity - ) throws SQLException + ) { StoreInstance storeInstance = authenticator.getStoreInstance(); ConnectionSpecification connectionSpecification = storeInstance.getConnectionSpecification(); @@ -125,17 +179,26 @@ public Connection getConnection( jdbcConfig.addDataSourceProperty("prepStmtCacheSqlLimit", 0); jdbcConfig.addDataSourceProperty("useServerPrepStmts", false); - handleHikariConfig(jdbcConfig); + jdbcConfig.setRegisterMbeans(true); + // TODO: jdbcConfig.setDataSource(new DataSourceWrapper(jdbcUrl, connectionProperties, databaseManager, authenticationPropertiesSupplier, authenticator, identity)); - HikariDataSource dataSource = new HikariDataSource(jdbcConfig); + return new HikariDataSource(jdbcConfig); + } - return dataSource.getConnection(); + public ConnectionPool getPool(String poolName) + { + return this.poolIndex.get(poolName); + } + + public int getPoolSize() + { + return this.poolIndex.size(); } - protected void handleHikariConfig(HikariConfig config) + public void flushPool() { - // do nothing + this.poolIndex.forEachKey(this.poolIndex::remove); } public static String getPoolName(Identity identity, ConnectionSpecification connectionSpecification, AuthenticationConfiguration authenticationConfiguration) @@ -162,7 +225,7 @@ private static DatabaseManager getManagerForDatabase(Database database) return manager; } - private static class ConnectionPool + public static class ConnectionPool { private final HikariDataSource dataSource; @@ -201,7 +264,6 @@ private static class DataSourceWrapper implements DataSource { private final String url; private final Properties connectionProperties; - private final DatabaseManager databaseManager; private final Function authenticationPropertiesSupplier; private final Authenticator authenticator; // TODO: @akphi - how do we get rid of this here? @@ -219,7 +281,6 @@ public DataSourceWrapper( { this.url = url; this.connectionProperties = connectionProperties; - this.databaseManager = databaseManager; try { this.driver = (Driver) Class.forName(databaseManager.getDriver()).getDeclaredConstructor().newInstance(); diff --git a/legend-engine-xts-relationalStore/pom.xml b/legend-engine-xts-relationalStore/pom.xml index de1fcc376fc..1d34bdeef56 100644 --- a/legend-engine-xts-relationalStore/pom.xml +++ b/legend-engine-xts-relationalStore/pom.xml @@ -31,7 +31,6 @@ legend-engine-xt-relationalStore-analytics legend-engine-xt-relationalStore-connection - legend-engine-xt-relationalStore-connection-test-support legend-engine-xt-relationalStore-dbExtension legend-engine-xt-relationalStore-execution legend-engine-xt-relationalStore-generation