Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add script to add users for bulk import #977

Open
wants to merge 3 commits into
base: feat/bulk-import-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
./usersHavingInvalidSchema.json
./remainingUsers.json
29 changes: 29 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Add Users For Bulk Import Script

The `/bulk-import/users` POST API endpoint in SuperTokens Core allows to add users to the database to bulk import the users. However, the API only allows importing 10,000 users at once. This script can take a JSON file containing a large number of users and call the API in batches of 10,000.
sattvikc marked this conversation as resolved.
Show resolved Hide resolved

## How to Run

1. Ensure you have Node.js (V16 or higher) installed on your system.
2. Open a terminal window and navigate to the directory where the script is located.
3. Run the following command:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the core may have an API key as well which needs to be supplied to this program

```
node index.js <CoreAPIURL> <InputFileName>
```

Replace `<CoreAPIURL>` with the URL of the core API endpoint and `<InputFileName>` with the path to the input JSON file containing user data.

## Format of Input File

The input file should be a JSON file with the same format as requested by the `/bulk-import/users` POST API endpoint.

## Expected Outputs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about an output whilst the script is running? Sort of like the overall progress.

- Upon successful execution, the script will output a summary message indicating the number of users processed, any remaining users, and any users with invalid schema.
- If there are remaining users to be processed, a file named `remainingUsers.json` will be generated, containing the details of remaining users.
- If there are users with invalid schema, a file named `usersHavingInvalidSchema.json` will be generated, containing the details of users with invalid schema.

## Note

The script would re-write the `remainingUsers.json` and `usersHavingInvalidSchema.json` files on each run. Ensure to back up these files if needed.
167 changes: 167 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

const fs = require('fs/promises');
const process = require('process');

const BATCH_SIZE = 10000;
const USERS_HAVING_INVALID_SCHEMA_FILE = './usersHavingInvalidSchema.json';
const REMAINING_USERS_FILE = './remainingUsers.json';

function parseInputArgs() {
if (process.argv.length !== 4) {
console.error('Usage: node index.js <CoreAPIURL> <InputFileName>');
process.exit(1);
}

return { coreAPIUrl: process.argv[2], inputFileName: process.argv[3] };
}


async function getUsersFromInputFile({ inputFileName }) {
try {
const inputFileDataString = await fs.readFile(inputFileName, 'utf8');
const inputFileData = JSON.parse(inputFileDataString);

if (!inputFileData.users || !Array.isArray(inputFileData.users) || inputFileData.users.length === 0) {
throw new Error('Expected users array in the input file.');
}

return inputFileData.users;
} catch (error) {
console.error('Error reading or parsing input file:', error.message);
process.exit(1);
}
}

async function deleteFailedToAddUsersFileIfExists() {
try {
await fs.rm(USERS_HAVING_INVALID_SCHEMA_FILE);
} catch (error) {
if (error.code !== 'ENOENT') {
console.error(`Failed to delete ${USERS_HAVING_INVALID_SCHEMA_FILE}:`, error.message);
}
}
}

async function addInvalidSchemaUsersToFile({ errors, users }) {
let parsedData = null;
try {
const existingData = await fs.readFile(USERS_HAVING_INVALID_SCHEMA_FILE, 'utf8');
parsedData = JSON.parse(existingData);
} catch (error) {
if (error.code === 'ENOENT') {
parsedData = { users: [] };
} else {
console.error(`Failed to read output file. Error: ${error.message}`);
throw error;
}
}

parsedData.users.push(...errors.map((err, index) => ({ user: users[index], errors: err.errors })));

await fs.writeFile(USERS_HAVING_INVALID_SCHEMA_FILE, JSON.stringify(parsedData, null, 2));

return users.filter((_, index) => !errors.some(err => err.index === index));
}

async function updateRemainingUsersFile({ users, index }) {
const remainingUsers = users.slice(index + 1);
await fs.writeFile(REMAINING_USERS_FILE, JSON.stringify({ users: remainingUsers }, null, 2));
}

async function removeRemainingUsersFile() {
try {
await fs.rm(REMAINING_USERS_FILE);
} catch (error) {
if (error.code !== 'ENOENT') {
console.error(`Failed to delete ${REMAINING_USERS_FILE}:`, error.message);
}
}
}

async function main() {
const { coreAPIUrl, inputFileName } = parseInputArgs();

const users = await getUsersFromInputFile({ inputFileName });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this actually pick up from the remainingUsers file if that exists? Cause the input file would have users that have been successfully imported as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise the dev would have to manually copy the contents of the remainingUsers file to their input file on each run, which can be something that they miss.


await deleteFailedToAddUsersFileIfExists();
await updateRemainingUsersFile({ users, index: 0 });

let usersToProcessInBatch = [];
let usersHavingInvalidSchemaCount = 0;
let i = 0;

try {
while (i < users.length || usersToProcessInBatch.length > 0) {
let remainingBatchSize = usersToProcessInBatch.length > BATCH_SIZE ? 0 : BATCH_SIZE - usersToProcessInBatch.length;
remainingBatchSize = Math.min(remainingBatchSize, users.length - i);

usersToProcessInBatch.push(...users.slice(i, i + remainingBatchSize));
Comment on lines +134 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments explaining all this with an example. Hard for me to understand the logic here.


const res = await fetch(`${coreAPIUrl}/bulk-import/users`, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to have some wait time each iteration. Otherwise it may breach the late limit of the core! 100 MS wait time.

method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ users: usersToProcessInBatch }),
});
Comment on lines +140 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need API key as well


if (!res.ok && res.status !== 400) {
const text = await res.text();
console.error(`Failed to add users. API response - status: ${res.status} body: ${text}`);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a good idea to break. Imaging I am running this script for many 100k users, and went away for a coffee. Then after 5 seconds this fails temporarily. Now it will make no progress even if the core is up. Instead, do exponential backoff (upto a few seconds max), and try again.

}

if (res.status === 400) {
const errors = await res.json();
usersHavingInvalidSchemaCount += errors.users.length;
usersToProcessInBatch = await addInvalidSchemaUsersToFile({ errors: errors.users, users: usersToProcessInBatch });
} else {
await updateRemainingUsersFile({ users, index: i });
usersToProcessInBatch = [];
}

i += remainingBatchSize;
}
} catch (error) {
console.log(`Got an unexpected Error: `, error);
}

const result = {
totalUsers: users.length,
processedUsers: i,
remainingUsers: users.length - i,
usersHavingInvalidSchema: usersHavingInvalidSchemaCount,
...(users.length - i > 0 && { remainingUsersFileName: REMAINING_USERS_FILE }),
...(usersHavingInvalidSchemaCount > 0 && { usersHavingInvalidSchemaFileName: USERS_HAVING_INVALID_SCHEMA_FILE }),
};

if (i < users.length) {
const message = usersHavingInvalidSchemaCount > 0 ?
`We processed ${i} users and ${usersHavingInvalidSchemaCount} users have invalid schema! Remaining users can be processed again by processing the ${REMAINING_USERS_FILE} file and users having invalid schema needs to be fixed and processed again by processing the ${USERS_HAVING_INVALID_SCHEMA_FILE} file.`
: `We processed ${i} users and ${users.length - i} users are remaining to be processed! Remaining users can be processed again by processing the ${REMAINING_USERS_FILE} file.`;
console.log({ message, ...result });
} else {
await removeRemainingUsersFile();
const message = usersHavingInvalidSchemaCount > 0 ?
`All users processed but ${usersHavingInvalidSchemaCount} users have invalid schema! Users having invalid schema needs to be fixed and processed again by processing the ${USERS_HAVING_INVALID_SCHEMA_FILE} file.` : `All users processed successfully!`;
console.log({ message, ...result }); ``
}
}


main()
16 changes: 16 additions & 0 deletions bulkimport_scripts/add_users_for_bulk_import/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "add_users_for_bulk_import",
"version": "1.0.0",
"engines": {
"node": ">=16.0.0"
},
"description": "A script that takes a JSON file containing user data and calls the bulk import API to add users to the database for processing.",
"main": "index.js",
"scripts": {
"start": "node index.js"
},
"keywords": [],
"author": "",
"license": "Apache-2.0",
"dependencies": {}
}
Loading