Skip to content

Commit

Permalink
fix: PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
anku255 committed May 29, 2024
1 parent 3ecd8ea commit ecb078b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

import com.google.gson.JsonObject;

import io.supertokens.pluginInterface.LOG_LEVEL;
import io.supertokens.pluginInterface.exceptions.DbInitException;
import io.supertokens.pluginInterface.exceptions.InvalidConfigException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.postgresql.config.Config;


/**
Expand Down Expand Up @@ -71,14 +72,33 @@ public void loadConfig(JsonObject configJson, Set<LOG_LEVEL> logLevels, TenantId
// We are overriding the loadConfig method to set the connection pool size
// to 1 to avoid creating many connections for the bulk import cronjob
configJson.addProperty("postgresql_connection_pool_size", 1);
Config.loadConfig(this, configJson, logLevels, tenantIdentifier);
super.loadConfig(configJson, logLevels, tenantIdentifier);
}

@Override
public void initStorage(boolean shouldWait, List<TenantIdentifier> tenantIdentifiers) throws DbInitException {
super.initStorage(shouldWait, tenantIdentifiers);

// `BulkImportProxyStorage` uses `BulkImportProxyConnection`, which overrides the `.commit()` method on the Connection object.
// The `initStorage()` method runs `select * from table_name limit 1` queries to check if the tables exist but these queries
// don't get committed due to the overridden `.commit()`, so we need to manually commit the transaction to remove any locks on the tables.

// Without this commit, a call to `select * from bulk_import_users limit 1` in `doesTableExist()` locks the `bulk_import_users` table,
try {
this.commitTransactionForBulkImportProxyStorage();
} catch (StorageQueryException e) {
throw new DbInitException(e);
}
}

@Override
public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException {
try {
this.connection.close();
this.connection = null;
if (this.connection != null) {
this.connection.close();
this.connection = null;
}
ConnectionPool.close(this);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -3159,19 +3159,18 @@ public List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(AppIde
}

@Override
public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId) throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException {
try {
BulkImportQueries.deleteBulkImportUser_Transaction(this, sqlCon, appIdentifier, bulkImportUserId);
BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException {
public long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException {
try {
BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId);
return BulkImportQueries.getBulkImportUsersCount(this, appIdentifier, status);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ static String getQueryToCreateBulkImportUsersTable(Start start) {
return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "primary_user_id VARCHAR(64),"
+ "primary_user_id VARCHAR(36),"
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000,"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000,"
+ "created_at BIGINT NOT NULL, "
+ "updated_at BIGINT NOT NULL, "
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey")
+ " PRIMARY KEY(app_id, id),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
Expand All @@ -77,12 +77,12 @@ public static String getQueryToCreatePaginationIndex2(Start start) {
public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
throws SQLException, StorageQueryException {
StringBuilder queryBuilder = new StringBuilder(
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES ");
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data, created_at, updated_at) VALUES ");

int userCount = users.size();

for (int i = 0; i < userCount; i++) {
queryBuilder.append(" (?, ?, ?)");
queryBuilder.append(" (?, ?, ?, ?, ?)");

if (i < userCount - 1) {
queryBuilder.append(",");
Expand All @@ -95,6 +95,8 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie
pst.setString(parameterIndex++, user.id);
pst.setString(parameterIndex++, appIdentifier.getAppId());
pst.setString(parameterIndex++, user.toRawDataForDbStorage());
pst.setLong(parameterIndex++, System.currentTimeMillis());
pst.setLong(parameterIndex++, System.currentTimeMillis());
}
});
}
Expand Down Expand Up @@ -129,6 +131,10 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
Connection sqlCon = (Connection) con.getConnection();
try {
// NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.

// "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously.
// If one process locks the first 1000 rows, others will wait for the lock to be released.
// "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */
Expand Down Expand Up @@ -253,17 +259,6 @@ public static List<String> deleteBulkImportUsers(Start start, AppIdentifier appI
});
}

public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull String bulkImportUserId) throws SQLException {
String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ? AND id = ?";

update(con, query, pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, bulkImportUserId);
});
}

public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier,
@Nonnull String bulkImportUserId,
@Nonnull String primaryUserId) throws SQLException, StorageQueryException {
Expand All @@ -278,6 +273,32 @@ public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier
});
}

public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws SQLException, StorageQueryException {
String baseQuery = "SELECT COUNT(*) FROM " + Config.getConfig(start).getBulkImportUsersTable();
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

queryBuilder.append(" WHERE app_id = ?");
parameters.add(appIdentifier.getAppId());

if (status != null) {
queryBuilder.append(" AND status = ?");
parameters.add(status.toString());
}

String query = queryBuilder.toString();

return execute(start, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
}, result -> {
result.next();
return result.getLong(1);
});
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,11 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index";
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_pagination_index1";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_pagination_index2";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
Expand Down

0 comments on commit ecb078b

Please sign in to comment.