Skip to content

Commit

Permalink
Merge pull request #38 from edgeandnode/pcv/ops-scripts-batching
Browse files Browse the repository at this point in the history
fix: improve batching in pull all and add to many tasks
  • Loading branch information
pcarranzav authored Dec 21, 2022
2 parents 97ca65e + 46d6c38 commit dfe0f0d
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 49 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ bin/
coverage.json

tasks/ops/depositors.json
tasks/ops/depositors.json.bak
126 changes: 96 additions & 30 deletions tasks/ops/addToMany.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import fs from 'fs'
import { BigNumber, utils } from 'ethers'
import { task } from 'hardhat/config'
import { task, types } from 'hardhat/config'
import { HardhatRuntimeEnvironment } from 'hardhat/types'
import { logger } from '../../utils/logging'
import { askForConfirmation, DEFAULT_DEPOSITORS_FILE } from './utils'
import { askForConfirmation, DEFAULT_DEPOSITORS_FILE, DEFAULT_BATCH_SIZE } from './utils'

task('ops:add-to-many', 'Execute a transaction depositing funds to a set of users from a JSON file')
.addFlag('dryRun', 'Do not execute transaction')
.addOptionalParam(
'batchSize',
'Batch size (i.e. number of users to process at a time)',
DEFAULT_BATCH_SIZE,
types.int,
)
.addOptionalParam('startBatch', 'Batch number to start from', 0, types.int)
.addOptionalParam('depositorsFile', 'Path to depositors file', DEFAULT_DEPOSITORS_FILE)
.setAction(async (taskArgs, hre: HardhatRuntimeEnvironment) => {
const { contracts } = hre
Expand All @@ -27,41 +34,100 @@ task('ops:add-to-many', 'Execute a transaction depositing funds to a set of user
logger.log(depositor.address, utils.formatEther(depositor.balance.hex))
}

if (taskArgs.dryRun) {
logger.log('Dry run, so not executing tx')
logger.log('Otherwise we would have executed:')
logger.log(`Billing.addToMany([${users}], [${balances}])`)
logger.log(`On Billing contract at ${contracts.Billing?.address} on chain ${chainId}`)
logger.log(`With signer ${account.address}`)
process.exit()
const nBatches = Math.floor(depositors.length / taskArgs.batchSize) + 1

if (taskArgs.startBatch == 0) {
if (taskArgs.dryRun) {
logger.log('Dry run, so not executing tx')
logger.log('Otherwise we would have executed:')
logger.log(`Token.approve(${contracts.Billing?.address}, ${totalBalance})`)
logger.log(`On Token contract at ${contracts.Token?.address} on chain ${chainId}`)
logger.log(`With signer ${account.address}`)

logger.log('TX calldata:')
logger.log(`--------------------`)
const grt = contracts.Token
if (!grt) {
throw new Error('GRT contract not found')
}
const tx = await grt.populateTransaction.approve(contracts.Billing?.address, totalBalance)
logger.log(tx.data)
logger.log(`--------------------`)
} else if (
await askForConfirmation(
`Execute <approve> transaction? **This will execute on network with chain ID ${chainId}**`,
)
) {
try {
logger.log('Transactions being sent')
logger.log(`--------------------`)
const grt = contracts.Token
if (!grt) {
throw new Error('GRT contract not found')
}
const tx = await grt.connect(account).approve(contracts.Billing?.address, totalBalance)
const receipt = await tx.wait()
logger.log('approve() TX Receipt: ', receipt)
} catch (e) {
logger.log(e)
process.exit(1)
}
} else {
logger.log('Skipping approve tx at user request')
}
} else {
logger.log(`Skipping approve tx since we are not on the first batch`)
}
if (
await askForConfirmation(
`Execute <addToMany> transaction? **This will execute on network with chain ID ${chainId}**`,
)
) {
try {
logger.log('Transactions being sent')
for (let batch = taskArgs.startBatch; batch < nBatches; batch++) {
const start = batch * taskArgs.batchSize
const end = Math.min(start + taskArgs.batchSize, depositors.length)
const batchUsers = users.slice(start, end)
const batchBalances = balances.slice(start, end)
logger.log(`Batch ${batch} (${batch + 1}/${nBatches}):`)
logger.log(`Users: ${batchUsers.length}`)
logger.log(`Total balance: ${utils.formatEther(batchBalances.reduce((a, b) => a.add(b), BigNumber.from(0)))}`)
logger.log(`--------------------------------`)
if (taskArgs.dryRun) {
logger.log('Dry run, so not executing tx')
logger.log('Otherwise we would have executed:')
logger.log(`--------------------`)
logger.log(`Billing.addToMany([${batchUsers}], [${batchBalances}])`)
logger.log(`--------------------`)
logger.log(`On Billing contract at ${contracts.Billing?.address} on chain ${chainId}`)
logger.log(`With signer ${account.address}`)

logger.log('TX calldata:')
logger.log(`--------------------`)
const billing = contracts.Billing
if (!billing) {
throw new Error('Billing contract not found')
}
const grt = contracts.Token
if (!grt) {
throw new Error('GRT contract not found')
const tx = await billing.populateTransaction.addToMany(batchUsers, batchBalances)
logger.log(tx.data)
logger.log(`--------------------`)
} else if (
await askForConfirmation(
`Execute <addToMany> transaction? **This will execute on network with chain ID ${chainId}**`,
)
) {
try {
logger.log('Transactions being sent')
logger.log(`--------------------`)
const billing = contracts.Billing
if (!billing) {
throw new Error('Billing contract not found')
}
const tx = await billing.connect(account).addToMany(batchUsers, batchBalances)
const receipt = await tx.wait()
logger.log('addToMany TX Receipt: ', receipt)
} catch (e) {
logger.log(e)
process.exit(1)
}
const tx1 = await grt.connect(account).approve(billing.address, totalBalance)
const receipt1 = await tx1.wait()
logger.log('approve() TX Receipt: ', receipt1)
const tx2 = await billing.connect(account).addToMany(users, balances)
const receipt2 = await tx2.wait()
logger.log('addToMany TX Receipt: ', receipt2)
} catch (e) {
logger.log(e)
process.exit(1)
} else {
logger.log('Bye!')
}
} else {
logger.log('Bye!')

logger.log(`--------------------------------`)
}
})
95 changes: 76 additions & 19 deletions tasks/ops/pullAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import fs from 'fs'
import axios from 'axios'

import { BigNumber, utils } from 'ethers'
import { task } from 'hardhat/config'
import { task, types } from 'hardhat/config'
import { HardhatRuntimeEnvironment } from 'hardhat/types'
import { logger } from '../../utils/logging'
import { askForConfirmation, DEFAULT_BILLING_SUBGRAPH, DEFAULT_DEPOSITORS_FILE } from './utils'
import { askForConfirmation, DEFAULT_BILLING_SUBGRAPH, DEFAULT_DEPOSITORS_FILE, DEFAULT_BATCH_SIZE } from './utils'
import path from 'path'

// This script will pull the funds from all the billing accounts and store
Expand All @@ -16,11 +16,24 @@ interface Depositor {
balance: BigNumber
}

export async function getAllDepositors(billingSubgraphUrl: string, page: number): Promise<Depositor[]> {
const query = `{
users(
first: 1000,
skip: ${page * 1000},
export async function getAllDepositors(
billingSubgraphUrl: string,
page: number,
pageSize: number,
blockNumber: number,
): Promise<Depositor[]> {
let queryPartOne: string
if (blockNumber > 0) {
queryPartOne = `{
users(
block: { number: ${blockNumber} },`
} else {
queryPartOne = `{
users(`
}
const queryPartTwo = `
first: ${pageSize},
skip: ${page * pageSize},
where: {billingBalance_gt: "0"},
orderBy: billingBalance,
orderDirection: desc
Expand All @@ -30,6 +43,7 @@ export async function getAllDepositors(billingSubgraphUrl: string, page: number)
}
}
`
const query = queryPartOne + queryPartTwo
const response = await axios.post(billingSubgraphUrl, { query })
logger.log(`Found: ${response.data.data.users.length} users`)
return response.data.data.users.map((user) => {
Expand All @@ -41,16 +55,29 @@ export async function getAllDepositors(billingSubgraphUrl: string, page: number)
}

function writeDepositorsToFile(depositors: Depositor[], filePath: string) {
fs.writeFileSync(filePath, JSON.stringify(depositors, null, 2), { flag: 'a+' })
let allDepositors: Depositor[] = []
if (fs.existsSync(filePath)) {
allDepositors = JSON.parse(fs.readFileSync(filePath).toString())
}
allDepositors = allDepositors.concat(depositors)
fs.writeFileSync(filePath, JSON.stringify(allDepositors, null, 2), { flag: 'w+' })
const writtenDepositors = JSON.parse(fs.readFileSync(filePath).toString())
if (writtenDepositors.length != depositors.length) {
if (writtenDepositors.length != allDepositors.length) {
throw new Error('Written depositors does not equal fetched depositors')
}
}

task('ops:pull-all', 'Execute transaction for pulling all funds from users')
.addFlag('dryRun', 'Do not execute transaction')
.addOptionalParam('dstAddress', 'Destination address of withdrawal')
.addOptionalParam(
'batchSize',
'Batch size (i.e. number of users to process at a time)',
DEFAULT_BATCH_SIZE,
types.int,
)
.addOptionalParam('startBatch', 'Batch number to start from', 0, types.int)
.addOptionalParam('blockNumber', 'Block number to use when fetching balances from the subgraph', 0, types.int)
.addOptionalParam('depositorsFile', 'Path to EOA depositors file', DEFAULT_DEPOSITORS_FILE)
.addOptionalParam('billingSubgraphUrl', 'Billing subgraph URL', DEFAULT_BILLING_SUBGRAPH)
.setAction(async (taskArgs, hre: HardhatRuntimeEnvironment) => {
Expand All @@ -59,19 +86,24 @@ task('ops:pull-all', 'Execute transaction for pulling all funds from users')
const { contracts } = hre
const chainId = hre.network.config.chainId
const dstAddress = taskArgs.dstAddress || collector.address
let page = 0
let depositors: Depositor[] = []
if (fs.existsSync(taskArgs.depositorsFile)) {
fs.renameSync(taskArgs.depositorsFile, path.join(taskArgs.depositorsFile, '.bak'))
let page = taskArgs.startBatch
const depositorBatches: Depositor[][] = []
if (!taskArgs.dryRun && taskArgs.startBatch > 0 && taskArgs.blockNumber == 0) {
logger.log('Please specify a block number when starting from a batch other than 0')
process.exit(1)
}
if (fs.existsSync(taskArgs.depositorsFile) && page == 0) {
fs.renameSync(taskArgs.depositorsFile, taskArgs.depositorsFile + '.bak')
}
let depositors: Depositor[] = []
do {
logger.log(`Getting depositors (page ${page})...`)
depositors = await getAllDepositors(taskArgs.billingSubgraphUrl, page)
logger.log(`Getting depositors (batch ${page}, size ${taskArgs.batchSize})...`)
depositors = await getAllDepositors(taskArgs.billingSubgraphUrl, page, taskArgs.batchSize, taskArgs.blockNumber)
if (depositors.length == 0) {
logger.log('No depositors found, done.')
process.exit()
logger.log('No depositors found, done with fetching.')
break
}
const users: string[] = depositors.map((depositor) => depositor.address)

const balances: BigNumber[] = depositors.map((depositor) => depositor.balance)

try {
Expand All @@ -88,13 +120,37 @@ task('ops:pull-all', 'Execute transaction for pulling all funds from users')
for (const depositor of depositors) {
logger.log(depositor.address, utils.formatEther(depositor.balance))
}
depositorBatches.push(depositors)
page += 1
} while (depositors.length > 0)

page = 0
for (const depositors of depositorBatches) {
if (depositors.length == 0) {
logger.log('No depositors found, done.')
break
}
logger.log(`Pulling tokens for depositors (batch ${page}, size ${depositors.length})...`)
const users: string[] = depositors.map((depositor) => depositor.address)
const balances: BigNumber[] = depositors.map((depositor) => depositor.balance)
if (taskArgs.dryRun) {
logger.log('Dry run, so not executing tx')
logger.log('Otherwise we would have executed:')
logger.log(`--------------------`)
logger.log(`Billing.pullMany([${users}], [${balances}], ${dstAddress})`)
logger.log(`--------------------`)
logger.log(`On Billing contract at ${contracts.Billing?.address} on chain ${chainId}`)
logger.log(`With signer ${collector.address}`)

logger.log('TX calldata:')
logger.log(`--------------------`)
const billing = contracts.Billing
if (!billing) {
throw new Error('Billing contract not found')
}
const tx = await billing.populateTransaction.pullMany(users, balances, dstAddress)
logger.log(tx.data)
logger.log(`--------------------`)
} else if (
await askForConfirmation(
`Execute <pullMany> transaction? **This will execute on network with chain ID ${chainId}**`,
Expand All @@ -114,9 +170,10 @@ task('ops:pull-all', 'Execute transaction for pulling all funds from users')
logger.log(e)
process.exit(1)
}
logger.log(`--------------------`)
} else {
logger.log('Bye!')
}
page += 1
} while (depositors.length > 0)
}
})
1 change: 1 addition & 0 deletions tasks/ops/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import inquirer from 'inquirer'

export const DEFAULT_DEPOSITORS_FILE = './tasks/ops/depositors.json'
export const DEFAULT_BILLING_SUBGRAPH = 'https://api.thegraph.com/subgraphs/name/graphprotocol/billing'
export const DEFAULT_BATCH_SIZE = 200

export async function askForConfirmation(message: string): Promise<boolean> {
const res = await inquirer.prompt({
Expand Down

0 comments on commit dfe0f0d

Please sign in to comment.