Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add script to add users for bulk import #977

Open
wants to merge 3 commits into
base: feat/bulk-import-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
./usersHavingInvalidSchema.json
./remainingUsers.json
34 changes: 34 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Add Users For Bulk Import Script

The `/bulk-import/users` POST API endpoint in SuperTokens Core allows to add users to the database to bulk import the users. However, the API only allows importing 10,000 users at once. This script can take a JSON file containing a large number of users and call the API in batches of 10,000.
sattvikc marked this conversation as resolved.
Show resolved Hide resolved

## How to Run

1. Ensure you have Node.js (v16 or higher) installed on your system.
2. Open a terminal window and navigate to the directory where the script is located.
3. Run `npm install` to install necessary dependencies.
anku255 marked this conversation as resolved.
Show resolved Hide resolved
4. Run the script using the following command:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of CoreAPIURL, use Core ConnectionURI


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the core may have an API key as well which needs to be supplied to this program

```
node index.js --core-endpoint <CoreAPIURL> --input-file <InputFileName> [--invalid-schema-file <InvalidSchemaFile>] [--remaining-users-file <RemainingUsersFile>]
anku255 marked this conversation as resolved.
Show resolved Hide resolved
```

- Replace `<CoreAPIURL>` with the URL of the core API endpoint.
- Replace `<InputFileName>` with the path to the input JSON file containing user data.
- Optionally, you can specify the paths for the output files:
- `--invalid-schema-file <InvalidSchemaFile>` specifies the path to the file storing users with invalid schema (default is `./usersHavingInvalidSchema.json`).
- `--remaining-users-file <RemainingUsersFile>` specifies the path to the file storing remaining users (default is `./remainingUsers.json`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remaining-users-file needs more explanation


## Format of Input File

The input file should be a JSON file with the same format as requested by the `/bulk-import/users` POST API endpoint. An example file named `example_input_file.json` is provided in the same directory.

## Expected Outputs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about an output whilst the script is running? Sort of like the overall progress.

- Upon successful execution, the script will output a summary message indicating the number of users processed, any remaining users, and any users with invalid schema.
- If there are remaining users to be processed, the file specified by `--remaining-users-file` (default `remainingUsers.json`) will be generated, containing the details of remaining users.
- If there are users with invalid schema, the file specified by `--invalid-schema-file` (default `usersHavingInvalidSchema.json`) will be generated, containing the details of users with invalid schema.

## Note

The script would re-write the files specified by `--remaining-users-file` and `--invalid-schema-file` options on each run. Ensure to back up these files if needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another note that even if this script has finished, it doesn't mean for sure that there will be no errors or that all the users are imported. Cause the cronjob in the core will have to run and it will take its time.

Ideally, this script should also take that into account, and show that output. For example, once we have called the API for all users in the input json file, then this script should query the core to check how many are processing, and how many have failed, out of the ones that have failed, it should output those in a file (same file as usersHavingInvalidSchema)? and the tell devs what to do.

145 changes: 145 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/example_input_file.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{
"users": [
{
"id": "5e1ea023-8787-4c38-8409-ea0b0230662e",
"externalUserId": "b263cfe8-f27f-4264-a405-de1527906fb7",
"userMetadata": {
"key1": "value1",
"key2": {
"key3": "value3"
}
},
"userRoles": [],
"loginMethods": [
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": true,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "emailpassword",
"email": "[email protected]",
"passwordHash": "$2a",
"hashingAlgorithm": "BCRYPT"
},
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": false,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "thirdparty",
"email": "[email protected]",
"thirdPartyId": "thirdPartyId0",
"thirdPartyUserId": "thirdPartyUserId0"
},
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": false,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "passwordless",
"email": "[email protected]"
}
]
},
{
"id": "2e66fcec-09ea-4cd3-ad4f-817099872b5c",
"externalUserId": "0afd9119-d613-43e6-82a0-556d93d61421",
"userMetadata": {
"key1": "value1",
"key2": {
"key3": "value3"
}
},
"userRoles": [],
"loginMethods": [
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": true,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "emailpassword",
"email": "[email protected]",
"passwordHash": "$2a",
"hashingAlgorithm": "BCRYPT"
},
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": false,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "thirdparty",
"email": "[email protected]",
"thirdPartyId": "thirdPartyId1",
"thirdPartyUserId": "thirdPartyUserId1"
},
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": false,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "passwordless",
"email": "[email protected]"
}
]
},
{
"id": "a9c828d1-a8db-4eb3-8e0a-1c985dba9fc9",
"externalUserId": "5a6fccfb-5778-40b1-be1c-2d8c25421253",
"userMetadata": {
"key1": "value1",
"key2": {
"key3": "value3"
}
},
"userRoles": [],
"loginMethods": [
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": true,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "emailpassword",
"email": "[email protected]",
"passwordHash": "$2a",
"hashingAlgorithm": "BCRYPT"
},
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": false,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "thirdparty",
"email": "[email protected]",
"thirdPartyId": "thirdPartyId2",
"thirdPartyUserId": "thirdPartyUserId2"
},
{
"tenantIds": [
"public"
],
"isVerified": true,
"isPrimary": false,
"timeJoinedInMSSinceEpoch": 1712067215922,
"recipeId": "passwordless",
"email": "[email protected]"
}
]
}
]
}
191 changes: 191 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

const fs = require('fs/promises');
const yargs = require('yargs');
const process = require('process');

const BATCH_SIZE = 10000;

async function parseInputArgs() {
const argv = await yargs
.option('core-endpoint', {
alias: 'c',
type: 'string',
describe: 'Core API URL endpoint',
demandOption: true,
})
.option('input-file', {
alias: 'i',
type: 'string',
describe: 'Path to the input file',
demandOption: true,
})
.option('invalid-schema-file', {
alias: 's',
type: 'string',
describe: 'Path to the file storing users with invalid schema',
default: './usersHavingInvalidSchema.json',
})
.option('remaining-users-file', {
alias: 'r',
type: 'string',
describe: 'Path to the file storing remaining users',
default: './remainingUsers.json',
})
.argv;

return {
coreAPIUrl: argv['core-endpoint'],
inputFileName: argv['input-file'],
usersHavingInvalidSchemaFileName: argv['invalid-schema-file'],
remainingUsersFileName: argv['remaining-users-file'],
};
}

async function getUsersFromInputFile({ inputFileName }) {
try {
const inputFileDataString = await fs.readFile(inputFileName, 'utf8');
const inputFileData = JSON.parse(inputFileDataString);

if (!inputFileData.users || !Array.isArray(inputFileData.users) || inputFileData.users.length === 0) {
throw new Error('Expected users array in the input file.');
}

return inputFileData.users;
} catch (error) {
console.error('Error reading or parsing input file:', error.message);
process.exit(1);
}
}

async function deleteUsersHavingInvalidSchemaFileIfExists({ usersHavingInvalidSchemaFileName }) {
try {
await fs.rm(usersHavingInvalidSchemaFileName);
} catch (error) {
if (error.code !== 'ENOENT') {
console.error(`Failed to delete ${usersHavingInvalidSchemaFileName}:`, error.message);
}
}
}

async function addInvalidSchemaUsersToFile({ errors, users, usersHavingInvalidSchemaFileName }) {
let parsedData = null;
try {
const existingData = await fs.readFile(usersHavingInvalidSchemaFileName, 'utf8');
parsedData = JSON.parse(existingData);
} catch (error) {
if (error.code === 'ENOENT') {
parsedData = { users: [] };
} else {
console.error(`Failed to read output file. Error: ${error.message}`);
throw error;
}
}

parsedData.users.push(...errors.map((err) => ({ user: users[err.index], errors: err.errors })));

await fs.writeFile(usersHavingInvalidSchemaFileName, JSON.stringify(parsedData, null, 2));

return users.filter((_, index) => !errors.some(err => err.index === index));
}

async function updateRemainingUsersFile({ users, index, remainingUsersFileName }) {
const remainingUsers = users.slice(index + 1);
await fs.writeFile(remainingUsersFileName, JSON.stringify({ users: remainingUsers }, null, 2));
}

async function removeRemainingUsersFile({ remainingUsersFileName }) {
try {
await fs.rm(remainingUsersFileName);
} catch (error) {
if (error.code !== 'ENOENT') {
console.error(`Failed to delete ${remainingUsersFileName}:`, error.message);
}
}
}

async function main() {
const { coreAPIUrl, inputFileName, usersHavingInvalidSchemaFileName, remainingUsersFileName } = await parseInputArgs();

const users = await getUsersFromInputFile({ inputFileName });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this actually pick up from the remainingUsers file if that exists? Cause the input file would have users that have been successfully imported as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise the dev would have to manually copy the contents of the remainingUsers file to their input file on each run, which can be something that they miss.


await deleteUsersHavingInvalidSchemaFileIfExists({ usersHavingInvalidSchemaFileName });
await updateRemainingUsersFile({ users, index: 0, remainingUsersFileName });

let usersToProcessInBatch = [];
let usersHavingInvalidSchemaCount = 0;
let i = 0;

try {
while (i < users.length || usersToProcessInBatch.length > 0) {
let remainingBatchSize = usersToProcessInBatch.length > BATCH_SIZE ? 0 : BATCH_SIZE - usersToProcessInBatch.length;
remainingBatchSize = Math.min(remainingBatchSize, users.length - i);

usersToProcessInBatch.push(...users.slice(i, i + remainingBatchSize));
Comment on lines +134 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments explaining all this with an example. Hard for me to understand the logic here.


const res = await fetch(`${coreAPIUrl}/bulk-import/users`, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to have some wait time each iteration. Otherwise it may breach the late limit of the core! 100 MS wait time.

method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ users: usersToProcessInBatch }),
});
Comment on lines +140 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need API key as well


if (!res.ok && res.status !== 400) {
const text = await res.text();
console.error(`Failed to add users. API response - status: ${res.status} body: ${text}`);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a good idea to break. Imaging I am running this script for many 100k users, and went away for a coffee. Then after 5 seconds this fails temporarily. Now it will make no progress even if the core is up. Instead, do exponential backoff (upto a few seconds max), and try again.

}

if (res.status === 400) {
const errors = await res.json();
usersHavingInvalidSchemaCount += errors.users.length;
usersToProcessInBatch = await addInvalidSchemaUsersToFile({ errors: errors.users, users: usersToProcessInBatch, usersHavingInvalidSchemaFileName });
} else {
await updateRemainingUsersFile({ users, index: i, remainingUsersFileName });
usersToProcessInBatch = [];
}

i += remainingBatchSize;
}
} catch (error) {
console.log(`Got an unexpected Error: `, error);
}

const result = {
totalUsers: users.length,
processedUsers: i,
remainingUsers: users.length - i,
usersHavingInvalidSchema: usersHavingInvalidSchemaCount,
...(users.length - i > 0 && { remainingUsersFileName }),
...(usersHavingInvalidSchemaCount > 0 && { usersHavingInvalidSchemaFileName }),
};

if (i < users.length) {
const message = usersHavingInvalidSchemaCount > 0 ?
`We processed ${i} users and ${usersHavingInvalidSchemaCount} users have invalid schema! Remaining users can be processed again by processing the ${remainingUsersFileName} file and users having invalid schema needs to be fixed and processed again by processing the ${usersHavingInvalidSchemaFileName} file.`
: `We processed ${i} users and ${users.length - i} users are remaining to be processed! Remaining users can be processed again by processing the ${remainingUsersFileName} file.`;
console.log({ message, ...result });
} else {
await removeRemainingUsersFile({ remainingUsersFileName });
const message = usersHavingInvalidSchemaCount > 0 ?
`All users processed but ${usersHavingInvalidSchemaCount} users have invalid schema! Users having invalid schema needs to be fixed and processed again by processing the ${usersHavingInvalidSchemaFileName} file.` : `All users processed successfully!`;
console.log({ message, ...result }); ``
}
}

main()
Loading
Loading