Skip to content

Commit

Permalink
feat(datastore): fix reboot recovery of channels
Browse files Browse the repository at this point in the history
  • Loading branch information
blakebyrnes committed Oct 10, 2024
1 parent 8186657 commit 891fc0d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 37 deletions.
11 changes: 8 additions & 3 deletions datastore/core/lib/ArgonPaymentProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ export default class ArgonPaymentProcessor implements IArgonPaymentProcessor {
}

const recipient = note.noteType.recipient;
const localchainAddress = await this.localchain.address;
if ((await this.canSign(recipient)) !== true) {
const localchainAddress = await this.localchain.address;

if (recipient !== localchainAddress) {
log.warn(
'This channelHold is made out to a different address than your attached localchain',
{
recipient,
channelHold: data.channelHold,
yourAddress: localchainAddress
yourAddress: localchainAddress,
} as any,
);
throw new Error('ChannelHold recipient not localchain address');
Expand Down Expand Up @@ -140,6 +140,11 @@ export default class ArgonPaymentProcessor implements IArgonPaymentProcessor {
}
}

private async canSign(address: string): Promise<boolean> {
const myAddress = await this.localchain.address;
return myAddress === address;
}

private timeForTick(tick: number): Date {
const time = this.localchain.ticker.timeForTick(tick);
return new Date(Number(time));
Expand Down
26 changes: 9 additions & 17 deletions datastore/main/payments/ArgonReserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { toUrl } from '@ulixee/commons/lib/utils';
import { IPayment } from '@ulixee/platform-specification';
import IPaymentServiceApiTypes from '@ulixee/platform-specification/datastore/PaymentServiceApis';
import IBalanceChange from '@ulixee/platform-specification/types/IBalanceChange';
import { IPaymentMethod } from '@ulixee/platform-specification/types/IPayment';
import ArgonUtils from '@ulixee/platform-utils/lib/ArgonUtils';
import { nanoid } from 'nanoid';
import * as Path from 'node:path';
Expand Down Expand Up @@ -36,9 +37,9 @@ export interface IChannelHoldSource {
milligons: bigint,
): Promise<IChannelHoldDetails>;
updateChannelHoldSettlement(
channelHold: IChannelHoldDetails,
channelHold: IPaymentMethod['channelHold'],
updatedSettlement: bigint,
): Promise<IBalanceChange>;
): Promise<void>;
}

export default class ArgonReserver
Expand All @@ -52,7 +53,6 @@ export default class ArgonReserver
[uuid: string]: { microgons: number; datastoreId: string; paymentId: string };
} = {};

private readonly openChannelHoldsById: { [channelHoldId: string]: IChannelHoldDetails } = {};
private readonly reserveQueueByDatastoreId: { [url: string]: Queue } = {};
private readonly channelHoldQueue = new Queue('CHANNELHOLD QUEUE', 1);
private needsSave = false;
Expand Down Expand Up @@ -109,10 +109,6 @@ export default class ArgonReserver
});
}

public getChannelHoldDetails(channelHoldId: string): IChannelHoldDetails {
return this.openChannelHoldsById[channelHoldId];
}

public async reserve(
paymentInfo: IPaymentServiceApiTypes['PaymentService.reserve']['args'],
): Promise<IPayment> {
Expand All @@ -133,7 +129,11 @@ export default class ArgonReserver
for (const paymentOption of this.paymentsByDatastoreId[datastoreId]) {
if (paymentOption.remaining >= microgons) {
if (paymentOption.paymentMethod.channelHold?.id) {
if (paymentOption.host !== datastoreHost) continue;
if (
paymentOption.host !== datastoreHost &&
!paymentOption.host.includes(`//${datastoreHost}`)
)
continue;
}
return await this.charge(paymentOption, microgons);
}
Expand Down Expand Up @@ -212,7 +212,6 @@ export default class ArgonReserver
datastoreId: id,
allocatedMilligons: holdAmount,
});
this.openChannelHoldsById[channelHoldId] = channelHold;
this.paymentsByDatastoreId[id] ??= [];
this.paymentsByDatastoreId[id].push(entry);
return entry;
Expand Down Expand Up @@ -268,14 +267,7 @@ export default class ArgonReserver
throw new Error('Cannot release more than the allocated amount');
}
if (updatedSettlement > channelHold.settledMilligons) {
const openChannelHold = this.openChannelHoldsById[channelHold.id];
if (!openChannelHold) throw new Error('ChannelHold not found');
const result = await this.channelHoldSource.updateChannelHoldSettlement(
openChannelHold,
updatedSettlement,
);
channelHold.settledMilligons = result.notes[0].milligons;
channelHold.settledSignature = result.signature;
await this.channelHoldSource.updateChannelHoldSettlement(channelHold, updatedSettlement);
this.needsSave = true;
this.emit('updateSettlement', {
channelHoldId: channelHold.id,
Expand Down
25 changes: 16 additions & 9 deletions datastore/main/payments/BrokerChannelHoldSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import HttpTransportToCore from '@ulixee/net/lib/HttpTransportToCore';
import { IDatabrokerApis } from '@ulixee/platform-specification/datastore';
import IPaymentServiceApiTypes from '@ulixee/platform-specification/datastore/PaymentServiceApis';
import IBalanceChange from '@ulixee/platform-specification/types/IBalanceChange';
import { IPaymentMethod } from '@ulixee/platform-specification/types/IPayment';
import Identity from '@ulixee/platform-utils/lib/Identity';
import serdeJson from '@ulixee/platform-utils/lib/serdeJson';
import { nanoid } from 'nanoid';
Expand All @@ -21,6 +22,7 @@ export default class BrokerChannelHoldSource implements IChannelHoldSource {
private readonly connectionToCore: ConnectionToCore<IDatabrokerApis, {}>;
private keyring = new Keyring({ type: 'sr25519', ss58Format: ADDRESS_PREFIX });
private readonly loadPromise: Promise<void | Error>;
private balanceChangeByChannelHoldId: { [channelHoldId: string]: IBalanceChange } = {};

constructor(
public host: string,
Expand Down Expand Up @@ -65,7 +67,7 @@ export default class BrokerChannelHoldSource implements IChannelHoldSource {
nonce,
);

return await this.connectionToCore.sendRequest({
const holdDetails = await this.connectionToCore.sendRequest({
command: 'Databroker.createChannelHold',
args: [
{
Expand All @@ -82,20 +84,25 @@ export default class BrokerChannelHoldSource implements IChannelHoldSource {
},
],
});
this.balanceChangeByChannelHoldId[holdDetails.channelHoldId] = holdDetails.balanceChange;
return holdDetails;
}

public async updateChannelHoldSettlement(
channelHold: IChannelHoldDetails,
channelHold: IPaymentMethod['channelHold'],
updatedSettlement: bigint,
): Promise<IBalanceChange> {
channelHold.balanceChange.notes[0].milligons = updatedSettlement;
channelHold.balanceChange.balance =
channelHold.balanceChange.channelHoldNote.milligons - updatedSettlement;
const json = serdeJson(channelHold.balanceChange);
): Promise<void> {
const balanceChange = this.balanceChangeByChannelHoldId[channelHold.id];

// TODO: add a way to retrieve the balance change from the broker
if (!balanceChange)
throw new Error(`No balance change found for channel hold ${channelHold.id}`);
balanceChange.notes[0].milligons = updatedSettlement;
balanceChange.balance = balanceChange.channelHoldNote.milligons - updatedSettlement;
const json = serdeJson(balanceChange);
const bytes = BalanceChangeBuilder.toSigningMessage(json);
const signature = this.keyring.getPairs()[0].sign(bytes, { withType: true });
channelHold.balanceChange.signature = Buffer.from(signature);
return channelHold.balanceChange;
balanceChange.signature = Buffer.from(signature);
}

public static createSignatureMessage(
Expand Down
14 changes: 6 additions & 8 deletions datastore/main/payments/LocalchainChannelHoldSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,16 @@ export default class LocalchainChannelHoldSource implements IChannelHoldSource {
}

public async updateChannelHoldSettlement(
channelHold: IChannelHoldDetails,
channelHold: IPaymentMethod['channelHold'],
updatedSettlement: bigint,
): Promise<IBalanceChange> {
const channelHoldId = channelHold.channelHoldId;
): Promise<void> {
const channelHoldId = channelHold.id;
this.openChannelHoldsById[channelHoldId] ??=
await this.localchain.openChannelHolds.get(channelHoldId);
const openChannelHold = this.openChannelHoldsById[channelHoldId];
const result = await openChannelHold.sign(updatedSettlement);
const balanceChange = channelHold.balanceChange;
balanceChange.signature = Buffer.from(result.signature);
balanceChange.notes[0].milligons = result.milligons;
balanceChange.balance = balanceChange.channelHoldNote!.milligons - result.milligons;
return balanceChange;
const channelHoldDetails = await openChannelHold.channelHold;
channelHold.settledMilligons = channelHoldDetails.settledAmount;
channelHold.settledSignature = Buffer.from(result.signature);
}
}

0 comments on commit 891fc0d

Please sign in to comment.