Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Currently sync interval stops if there is a failure, instead log failure and wait for the interval #958

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/lucky-dots-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Prevent SyncEngine from stopping completely during a sync failure, next interval will try again.
19 changes: 16 additions & 3 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ export class SyncEngineLevel implements SyncEngine {

clearInterval(this._syncIntervalId);
this._syncIntervalId = undefined;
await this.sync();

try {
await this.sync();
} catch (error) {
console.error('SyncEngineLevel: Error during sync operation', error);
}

if (!this._syncIntervalId) {
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
Expand Down Expand Up @@ -405,7 +410,7 @@ export class SyncEngineLevel implements SyncEngine {
syncDirection: SyncDirection,
syncPeerState: SyncState[]
}) {
for (let syncState of syncPeerState) {
const enqueueOps = await Promise.allSettled(syncPeerState.map(async (syncState) => {
// Get the event log from the remote DWN if pull sync, or local DWN if push sync.
const eventLog = await this.getDwnEventLog({
did : syncState.did,
Expand Down Expand Up @@ -435,7 +440,15 @@ export class SyncEngineLevel implements SyncEngine {
: this.getPushQueue();
await syncQueue.batch(syncOperations as any);
}
}
}));

// log any errors that occurred during the enqueuing process
enqueueOps.forEach((result, index) => {
if (result.status === 'rejected') {
const peerState = syncPeerState[index];
console.error(`SyncEngineLevel: Error enqueuing sync operation for peerState: ${JSON.stringify(peerState)}`, result.reason);
}
});
}

private static generateSyncMessageParamsKey({ did, delegateDid, dwnUrl, protocol, watermark, messageCid }:SyncMessageParams): string {
Expand Down
68 changes: 68 additions & 0 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,36 @@ describe('SyncEngineLevel', () => {

clock.restore();
});

it('sync logs failures when enqueueing sync operations', async () => {
// returns 3 DID peers to sync with
sinon.stub(syncEngine as any, 'getSyncPeerState').resolves([{
did: 'did:example:alice',
}, {
did: 'did:example:bob',
}, {
did: 'did:example:carol',
}]);

const getDwnEventLogSpy = sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([]);
getDwnEventLogSpy.onCall(2).rejects(new Error('Failed to get event log'));

// spy on the console error
const consoleErrorSpy = sinon.stub(console, 'error').resolves();

await syncEngine.sync();

expect(consoleErrorSpy.callCount).to.equal(1);
expect(consoleErrorSpy.firstCall.args[0]).to.include('Error enqueuing sync operation for peerState');

// reset the error spy
consoleErrorSpy.resetHistory();

// sync again, this time no errors should be thrown
await syncEngine.sync();

expect(consoleErrorSpy.notCalled).to.be.true;
});
});

describe('pull()', () => {
Expand Down Expand Up @@ -2002,6 +2032,44 @@ describe('SyncEngineLevel', () => {
syncSpy.restore();
clock.restore();
});

it('should log sync errors, but continue syncing the next interval', async () => {
await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');

syncSpy.returns(new Promise<void>((resolve, reject) => {
clock.setTimeout(() => {
resolve();
}, 100);
}));

// first call is the initial sync, 2nd and onward are the intervals
// on the 2nd interval (3rd call), we reject the promise, a 4th call should be made
syncSpy.onThirdCall().rejects(new Error('Sync error'));

// spy on console.error to check if the error message is logged
const consoleErrorSpy = sinon.stub(console, 'error').resolves();

testHarness.agent.sync.startSync({ interval: '500ms' });

// three intervals
await clock.tickAsync(1_500);

// this should equal 4, once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(4);

// check if the error message is logged
expect(consoleErrorSpy.callCount).to.equal(1);
expect(consoleErrorSpy.args[0][0]).to.include('SyncEngineLevel: Error during sync operation');

syncSpy.restore();
consoleErrorSpy.restore();
clock.restore();
});
});

describe('stopSync()', () => {
Expand Down
Loading