Skip to content

Commit

Permalink
Merge branch 'feat/bulk-import-base' into feat/bulk-import-1
Browse files Browse the repository at this point in the history
  • Loading branch information
anku255 committed Apr 25, 2024
2 parents 1375803 + fba9858 commit c6a9329
Show file tree
Hide file tree
Showing 13 changed files with 810 additions and 166 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [7.1.0] - 2024-04-10
## [7.1.0] - 2024-04-25

- Adds queries for Bulk Import

## [7.0.1] - 2024-04-17

- Fixes issues with partial failures during tenant creation

## [7.0.0] - 2024-03-13

- Replace `TotpNotEnabledError` with `UnknownUserIdTotpError`.
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion pluginInterfaceSupported.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_comment": "contains a list of plugin interfaces branch names that this core supports",
"versions": [
"6.0"
"6.1"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class BulkImportProxyStorage extends Start {
private BulkImportProxyConnection connection;

public synchronized Connection getTransactionConnection() throws SQLException {
public synchronized Connection getTransactionConnection() throws SQLException, StorageQueryException {
if (this.connection == null) {
Connection con = ConnectionPool.getConnectionForProxyStorage(this);
this.connection = new BulkImportProxyConnection(con);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.supertokens.pluginInterface.exceptions.DbInitException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.config.PostgreSQLConfig;
import io.supertokens.storage.postgresql.output.Logging;
Expand All @@ -35,12 +36,14 @@ public class ConnectionPool extends ResourceDistributor.SingletonResource {
private static final String RESOURCE_KEY = "io.supertokens.storage.postgresql.ConnectionPool";
private HikariDataSource hikariDataSource;
private final Start start;
private PostConnectCallback postConnectCallback;

private ConnectionPool(Start start) {
private ConnectionPool(Start start, PostConnectCallback postConnectCallback) {
this.start = start;
this.postConnectCallback = postConnectCallback;
}

private synchronized void initialiseHikariDataSource() throws SQLException {
private synchronized void initialiseHikariDataSource() throws SQLException, StorageQueryException {
if (this.hikariDataSource != null) {
return;
}
Expand Down Expand Up @@ -99,6 +102,19 @@ private synchronized void initialiseHikariDataSource() throws SQLException {
} catch (Exception e) {
throw new SQLException(e);
}

try {
try (Connection con = hikariDataSource.getConnection()) {
this.postConnectCallback.apply(con);
}
} catch (StorageQueryException e) {
// if an exception happens here, we want to set the hikariDataSource to null once again so that
// whenever the getConnection is called again, we want to re-attempt creation of tables and tenant
// entries for this storage
hikariDataSource.close();
hikariDataSource = null;
throw e;
}
}

private static int getTimeToWaitToInit(Start start) {
Expand Down Expand Up @@ -133,7 +149,7 @@ static boolean isAlreadyInitialised(Start start) {
return getInstance(start) != null && getInstance(start).hikariDataSource != null;
}

static void initPool(Start start, boolean shouldWait) throws DbInitException {
static void initPool(Start start, boolean shouldWait, PostConnectCallback postConnectCallback) throws DbInitException {
if (isAlreadyInitialised(start)) {
return;
}
Expand All @@ -146,7 +162,7 @@ static void initPool(Start start, boolean shouldWait) throws DbInitException {
" specified the correct values for ('postgresql_host' and 'postgresql_port') or for "
+ "'postgresql_connection_uri'";
try {
ConnectionPool con = new ConnectionPool(start);
ConnectionPool con = new ConnectionPool(start, postConnectCallback);
start.getResourceDistributor().setResource(RESOURCE_KEY, con);
while (true) {
try {
Expand Down Expand Up @@ -189,7 +205,7 @@ static void initPool(Start start, boolean shouldWait) throws DbInitException {
}
}

private static Connection getNewConnection(Start start) throws SQLException {
private static Connection getNewConnection(Start start) throws SQLException, StorageQueryException {
if (getInstance(start) == null) {
throw new IllegalStateException("Please call initPool before getConnection");
}
Expand All @@ -202,11 +218,11 @@ private static Connection getNewConnection(Start start) throws SQLException {
return getInstance(start).hikariDataSource.getConnection();
}

public static Connection getConnectionForProxyStorage(Start start) throws SQLException {
public static Connection getConnectionForProxyStorage(Start start) throws SQLException, StorageQueryException {
return getNewConnection(start);
}

public static Connection getConnection(Start start) throws SQLException {
public static Connection getConnection(Start start) throws SQLException, StorageQueryException {
if (start instanceof BulkImportProxyStorage) {
return ((BulkImportProxyStorage) start).getTransactionConnection();
}
Expand All @@ -227,4 +243,9 @@ static void close(Start start) {
}
}
}

@FunctionalInterface
public static interface PostConnectCallback {
void apply(Connection connection) throws StorageQueryException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException {
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
return update(con, QUERY, setter);
}
Expand Down
41 changes: 33 additions & 8 deletions src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTransactionRollbackException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.*;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;

Expand Down Expand Up @@ -244,7 +241,7 @@ public void stopLogging() {
}

@Override
public void initStorage(boolean shouldWait) throws DbInitException {
public void initStorage(boolean shouldWait, List<TenantIdentifier> tenantIdentifiers) throws DbInitException {
if (ConnectionPool.isAlreadyInitialised(this)) {
return;
}
Expand All @@ -254,8 +251,20 @@ public void initStorage(boolean shouldWait) throws DbInitException {
mainThread = Thread.currentThread();
}
try {
ConnectionPool.initPool(this, shouldWait);
GeneralQueries.createTablesIfNotExists(this);
ConnectionPool.initPool(this, shouldWait, (con) -> {
try {
GeneralQueries.createTablesIfNotExists(this, con);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
for (TenantIdentifier tenantIdentifier : tenantIdentifiers) {
try {
this.addTenantIdInTargetStorage_Transaction(con, tenantIdentifier);
} catch (DuplicateTenantException e) {
// ignore
}
}
});
} catch (Exception e) {
throw new DbInitException(e);
}
Expand Down Expand Up @@ -491,7 +500,7 @@ public void deleteAllInformation() throws StorageQueryException {
}
ProcessState.getInstance(this).clear();
try {
initStorage(false);
initStorage(false, new ArrayList<>());
enabled = true; // Allow get connection to work, to delete the data
GeneralQueries.deleteAllTables(this);

Expand Down Expand Up @@ -2309,6 +2318,22 @@ public void addTenantIdInTargetStorage(TenantIdentifier tenantIdentifier)
}
}

public void addTenantIdInTargetStorage_Transaction(Connection con, TenantIdentifier tenantIdentifier)
throws DuplicateTenantException, StorageQueryException {
try {
MultitenancyQueries.addTenantIdInTargetStorage_Transaction(this, con, tenantIdentifier);
} catch (SQLException e) {
if (e instanceof PSQLException) {
PostgreSQLConfig config = Config.getConfig(this);
if (isPrimaryKeyError(((PSQLException) e).getServerErrorMessage(),
config.getTenantsTable())) {
throw new DuplicateTenantException();
}
}
throw new StorageQueryException(e);
}
}

@Override
public void overwriteTenantConfig(TenantConfig tenantConfig)
throws TenantOrAppNotFoundException, StorageQueryException, DuplicateThirdPartyIdException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static String getQueryToCreatePaginationIndex2(Start start) {
}

public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
throws SQLException {
throws SQLException, StorageQueryException {
StringBuilder queryBuilder = new StringBuilder(
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES ");

Expand Down Expand Up @@ -266,7 +266,7 @@ public static void deleteBulkImportUser_Transaction(Start start, Connection con,

public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier,
@Nonnull String bulkImportUserId,
@Nonnull String primaryUserId) throws SQLException {
@Nonnull String primaryUserId) throws SQLException, StorageQueryException {
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET primary_user_id = ?, updated_at = ? WHERE app_id = ? and id = ?";

Expand Down
Loading

0 comments on commit c6a9329

Please sign in to comment.