diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java index 57879344..f4e8dd8e 100644 --- a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java @@ -18,17 +18,18 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.List; import java.util.Set; import com.google.gson.JsonObject; import io.supertokens.pluginInterface.LOG_LEVEL; +import io.supertokens.pluginInterface.exceptions.DbInitException; import io.supertokens.pluginInterface.exceptions.InvalidConfigException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; -import io.supertokens.storage.postgresql.config.Config; /** @@ -71,14 +72,33 @@ public void loadConfig(JsonObject configJson, Set logLevels, TenantId // We are overriding the loadConfig method to set the connection pool size // to 1 to avoid creating many connections for the bulk import cronjob configJson.addProperty("postgresql_connection_pool_size", 1); - Config.loadConfig(this, configJson, logLevels, tenantIdentifier); + super.loadConfig(configJson, logLevels, tenantIdentifier); + } + + @Override + public void initStorage(boolean shouldWait, List tenantIdentifiers) throws DbInitException { + super.initStorage(shouldWait, tenantIdentifiers); + + // `BulkImportProxyStorage` uses `BulkImportProxyConnection`, which overrides the `.commit()` method on the Connection object. + // The `initStorage()` method runs `select * from table_name limit 1` queries to check if the tables exist but these queries + // don't get committed due to the overridden `.commit()`, so we need to manually commit the transaction to remove any locks on the tables. + + // Without this commit, a call to `select * from bulk_import_users limit 1` in `doesTableExist()` locks the `bulk_import_users` table, + try { + this.commitTransactionForBulkImportProxyStorage(); + } catch (StorageQueryException e) { + throw new DbInitException(e); + } } @Override public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException { try { - this.connection.close(); - this.connection = null; + if (this.connection != null) { + this.connection.close(); + this.connection = null; + } + ConnectionPool.close(this); } catch (SQLException e) { throw new StorageQueryException(e); } diff --git a/src/main/java/io/supertokens/storage/postgresql/Start.java b/src/main/java/io/supertokens/storage/postgresql/Start.java index 9c10a2ee..411e52b3 100644 --- a/src/main/java/io/supertokens/storage/postgresql/Start.java +++ b/src/main/java/io/supertokens/storage/postgresql/Start.java @@ -3159,19 +3159,18 @@ public List getBulkImportUsersAndChangeStatusToProcessing(AppIde } @Override - public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId) throws StorageQueryException { - Connection sqlCon = (Connection) con.getConnection(); + public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException { try { - BulkImportQueries.deleteBulkImportUser_Transaction(this, sqlCon, appIdentifier, bulkImportUserId); + BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId); } catch (SQLException e) { throw new StorageQueryException(e); } } @Override - public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException { + public long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException { try { - BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId); + return BulkImportQueries.getBulkImportUsersCount(this, appIdentifier, status); } catch (SQLException e) { throw new StorageQueryException(e); } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 165c606b..0a01fd4f 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -45,12 +45,12 @@ static String getQueryToCreateBulkImportUsersTable(Start start) { return "CREATE TABLE IF NOT EXISTS " + tableName + " (" + "id CHAR(36)," + "app_id VARCHAR(64) NOT NULL DEFAULT 'public'," - + "primary_user_id VARCHAR(64)," + + "primary_user_id VARCHAR(36)," + "raw_data TEXT NOT NULL," + "status VARCHAR(128) DEFAULT 'NEW'," + "error_msg TEXT," - + "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000," - + "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000," + + "created_at BIGINT NOT NULL, " + + "updated_at BIGINT NOT NULL, " + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey") + " PRIMARY KEY(app_id, id)," + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " " @@ -77,12 +77,12 @@ public static String getQueryToCreatePaginationIndex2(Start start) { public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) throws SQLException, StorageQueryException { StringBuilder queryBuilder = new StringBuilder( - "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES "); + "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data, created_at, updated_at) VALUES "); int userCount = users.size(); for (int i = 0; i < userCount; i++) { - queryBuilder.append(" (?, ?, ?)"); + queryBuilder.append(" (?, ?, ?, ?, ?)"); if (i < userCount - 1) { queryBuilder.append(","); @@ -95,6 +95,8 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie pst.setString(parameterIndex++, user.id); pst.setString(parameterIndex++, appIdentifier.getAppId()); pst.setString(parameterIndex++, user.toRawDataForDbStorage()); + pst.setLong(parameterIndex++, System.currentTimeMillis()); + pst.setLong(parameterIndex++, System.currentTimeMillis()); } }); } @@ -129,6 +131,10 @@ public static List getBulkImportUsersAndChangeStatusToProcessing Connection sqlCon = (Connection) con.getConnection(); try { // NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again. + + // "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously. + // If one process locks the first 1000 rows, others will wait for the lock to be released. + // "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows. String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ?" + " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */ @@ -253,17 +259,6 @@ public static List deleteBulkImportUsers(Start start, AppIdentifier appI }); } - public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, - @Nonnull String bulkImportUserId) throws SQLException { - String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() - + " WHERE app_id = ? AND id = ?"; - - update(con, query, pst -> { - pst.setString(1, appIdentifier.getAppId()); - pst.setString(2, bulkImportUserId); - }); - } - public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws SQLException, StorageQueryException { @@ -278,6 +273,32 @@ public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier }); } + public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws SQLException, StorageQueryException { + String baseQuery = "SELECT COUNT(*) FROM " + Config.getConfig(start).getBulkImportUsersTable(); + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + if (status != null) { + queryBuilder.append(" AND status = ?"); + parameters.add(status.toString()); + } + + String query = queryBuilder.toString(); + + return execute(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + result.next(); + return result.getLong(1); + }); + } + private static class BulkImportUserRowMapper implements RowMapper { private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper(); diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java index 1ff9ef7f..d0d9f7de 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java @@ -593,7 +593,11 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer update(start, DROP_QUERY, NO_OP_SETTER); } { - String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index"; + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_pagination_index1"; + update(start, DROP_QUERY, NO_OP_SETTER); + } + { + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_pagination_index2"; update(start, DROP_QUERY, NO_OP_SETTER); } {