Skip to content

Commit

Permalink
Merge branch 'main' into okstatus-feat
Browse files Browse the repository at this point in the history
  • Loading branch information
Toheeb-Ojuolape authored Oct 21, 2024
2 parents 5de24a2 + 1334fae commit d4ebc48
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 76 deletions.
8 changes: 8 additions & 0 deletions .changeset/lucky-dots-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Prevent SyncEngine from stopping completely during a sync failure, next interval will try again.
5 changes: 5 additions & 0 deletions .changeset/old-buckets-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@web5/api": patch
---

Allow records to be updated without storing.
19 changes: 16 additions & 3 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ export class SyncEngineLevel implements SyncEngine {

clearInterval(this._syncIntervalId);
this._syncIntervalId = undefined;
await this.sync();

try {
await this.sync();
} catch (error) {
console.error('SyncEngineLevel: Error during sync operation', error);
}

if (!this._syncIntervalId) {
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
Expand Down Expand Up @@ -405,7 +410,7 @@ export class SyncEngineLevel implements SyncEngine {
syncDirection: SyncDirection,
syncPeerState: SyncState[]
}) {
for (let syncState of syncPeerState) {
const enqueueOps = await Promise.allSettled(syncPeerState.map(async (syncState) => {
// Get the event log from the remote DWN if pull sync, or local DWN if push sync.
const eventLog = await this.getDwnEventLog({
did : syncState.did,
Expand Down Expand Up @@ -435,7 +440,15 @@ export class SyncEngineLevel implements SyncEngine {
: this.getPushQueue();
await syncQueue.batch(syncOperations as any);
}
}
}));

// log any errors that occurred during the enqueuing process
enqueueOps.forEach((result, index) => {
if (result.status === 'rejected') {
const peerState = syncPeerState[index];
console.error(`SyncEngineLevel: Error enqueuing sync operation for peerState: ${JSON.stringify(peerState)}`, result.reason);
}
});
}

private static generateSyncMessageParamsKey({ did, delegateDid, dwnUrl, protocol, watermark, messageCid }:SyncMessageParams): string {
Expand Down
68 changes: 68 additions & 0 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,36 @@ describe('SyncEngineLevel', () => {

clock.restore();
});

it('sync logs failures when enqueueing sync operations', async () => {
// returns 3 DID peers to sync with
sinon.stub(syncEngine as any, 'getSyncPeerState').resolves([{
did: 'did:example:alice',
}, {
did: 'did:example:bob',
}, {
did: 'did:example:carol',
}]);

const getDwnEventLogSpy = sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([]);
getDwnEventLogSpy.onCall(2).rejects(new Error('Failed to get event log'));

// spy on the console error
const consoleErrorSpy = sinon.stub(console, 'error').resolves();

await syncEngine.sync();

expect(consoleErrorSpy.callCount).to.equal(1);
expect(consoleErrorSpy.firstCall.args[0]).to.include('Error enqueuing sync operation for peerState');

// reset the error spy
consoleErrorSpy.resetHistory();

// sync again, this time no errors should be thrown
await syncEngine.sync();

expect(consoleErrorSpy.notCalled).to.be.true;
});
});

describe('pull()', () => {
Expand Down Expand Up @@ -2002,6 +2032,44 @@ describe('SyncEngineLevel', () => {
syncSpy.restore();
clock.restore();
});

it('should log sync errors, but continue syncing the next interval', async () => {
await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');

syncSpy.returns(new Promise<void>((resolve, reject) => {
clock.setTimeout(() => {
resolve();
}, 100);
}));

// first call is the initial sync, 2nd and onward are the intervals
// on the 2nd interval (3rd call), we reject the promise, a 4th call should be made
syncSpy.onThirdCall().rejects(new Error('Sync error'));

// spy on console.error to check if the error message is logged
const consoleErrorSpy = sinon.stub(console, 'error').resolves();

testHarness.agent.sync.startSync({ interval: '500ms' });

// three intervals
await clock.tickAsync(1_500);

// this should equal 4, once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(4);

// check if the error message is logged
expect(consoleErrorSpy.callCount).to.equal(1);
expect(consoleErrorSpy.args[0][0]).to.include('SyncEngineLevel: Error during sync operation');

syncSpy.restore();
consoleErrorSpy.restore();
clock.restore();
});
});

describe('stopSync()', () => {
Expand Down
6 changes: 5 additions & 1 deletion packages/api/src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ export type RecordUpdateParams = {
*/
dataCid?: DwnMessageDescriptor[DwnInterface.RecordsWrite]['dataCid'];

/** Whether or not to store the updated message. */
store?: boolean;

/** The data format/MIME type of the supplied data */
dataFormat?: string;

Expand Down Expand Up @@ -706,7 +709,7 @@ export class Record implements RecordModel {
*
* @beta
*/
async update({ dateModified, data, protocolRole, ...params }: RecordUpdateParams): Promise<DwnResponseStatus> {
async update({ dateModified, data, protocolRole, store = true, ...params }: RecordUpdateParams): Promise<DwnResponseStatus> {

if (this.deleted) {
throw new Error('Record: Cannot revive a deleted record.');
Expand Down Expand Up @@ -760,6 +763,7 @@ export class Record implements RecordModel {
messageParams : { ...updateMessage },
messageType : DwnInterface.RecordsWrite,
target : this._connectedDid,
store
};

if (this._delegateDid) {
Expand Down
150 changes: 78 additions & 72 deletions packages/api/tests/record.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2611,58 +2611,6 @@ describe('Record', () => {
expect(readResultAfterUpdate.status.code).to.equal(401);
});

it('updates a record locally that only written to a remote DWN', async () => {
// Create a record but do not store it on the local DWN.
const { status, record } = await dwnAlice.records.write({
store : false,
data : 'Hello, world!',
message : {
schema : 'foo/bar',
dataFormat : 'text/plain'
}
});
expect(status.code).to.equal(202);
expect(record).to.not.be.undefined;

// Store the data CID of the record before it is updated.
const dataCidBeforeDataUpdate = record!.dataCid;

// Write the record to a remote DWN.
const { status: sendStatus } = await record!.send(aliceDid.uri);
expect(sendStatus.code).to.equal(202);

// fails because record has not been stored in the local dwn yet
let updateResult = await record!.update({ data: 'bye' });
expect(updateResult.status.code).to.equal(400);
expect(updateResult.status.detail).to.equal('RecordsWriteGetInitialWriteNotFound: Initial write is not found.');

const { status: recordStoreStatus }= await record.store();
expect(recordStoreStatus.code).to.equal(202);

// now succeeds with the update
updateResult = await record!.update({ data: 'bye' });
expect(updateResult.status.code).to.equal(202);

// Confirm that the record was written to the local DWN.
const readResult = await dwnAlice.records.read({
message: {
filter: {
recordId: record!.id
}
}
});
expect(readResult.status.code).to.equal(200);
expect(readResult.record).to.not.be.undefined;

// Confirm that the data CID of the record was updated.
expect(readResult.record.dataCid).to.not.equal(dataCidBeforeDataUpdate);
expect(readResult.record.dataCid).to.equal(record!.dataCid);

// Confirm that the data payload of the record was modified.
const updatedData = await record!.data.text();
expect(updatedData).to.equal('bye');
});

it('allows to update a record locally that was initially read from a remote DWN if store() is issued', async () => {
// Create a record but do not store it on the local DWN.
const { status, record } = await dwnAlice.records.write({
Expand Down Expand Up @@ -2725,7 +2673,7 @@ describe('Record', () => {
expect(readResult.record.dataCid).to.equal(readRecord.dataCid);
});

it('updates a record locally that was initially queried from a remote DWN', async () => {
it('updates a record that was queried from a remote DWN without storing it', async () => {
// Create a record but do not store it on the local DWN.
const { status, record } = await dwnAlice.records.write({
store : false,
Expand All @@ -2746,7 +2694,7 @@ describe('Record', () => {
expect(sendStatus.code).to.equal(202);

// Query the record from the remote DWN.
const queryResult = await dwnAlice.records.query({
let queryResult = await dwnAlice.records.query({
from : aliceDid.uri,
message : {
filter: {
Expand All @@ -2758,37 +2706,95 @@ describe('Record', () => {
expect(queryResult.records).to.not.be.undefined;
expect(queryResult.records.length).to.equal(1);

// Attempt to update the queried record, which will fail because we haven't stored the queried record locally yet
// Attempt to update the queried record
const [ queriedRecord ] = queryResult.records;
let updateResult = await queriedRecord!.update({ data: 'bye' });
expect(updateResult.status.code).to.equal(400);
expect(updateResult.status.detail).to.equal('RecordsWriteGetInitialWriteNotFound: Initial write is not found.');

// store the queried record
const { status: queriedStoreStatus } = await queriedRecord.store();
expect(queriedStoreStatus.code).to.equal(202);

updateResult = await queriedRecord!.update({ data: 'bye' });
let updateResult = await queriedRecord!.update({ data: 'Updated, world!', store: false });
expect(updateResult.status.code).to.equal(202);

// Confirm that the record was written to the local DWN.
const readResult = await dwnAlice.records.read({
// confirm that the record does not exist locally
queryResult = await dwnAlice.records.read({
message: {
filter: {
recordId: record!.id
}
}
});
expect(queryResult.status.code).to.equal(404);
});

it('updates a record which has a parent reference from a remote DWN without storing it or its parent', async () => {
// create a parent thread
const { status: threadStatus, record: threadRecord } = await dwnAlice.records.write({
store : false,
data : 'Hello, world!',
message : {
protocol : protocolDefinition.protocol,
schema : protocolDefinition.types.thread.schema,
protocolPath : 'thread'
}
});

expect(threadStatus.code).to.equal(202);
expect(threadRecord).to.not.be.undefined;

const { status: threadSendStatus } = await threadRecord.send();
expect(threadSendStatus.code).to.equal(202);

// create an email with the thread as a parent
const { status: emailStatus, record: emailRecord } = await dwnAlice.records.write({
store : false,
data : 'Hello, world!',
message : {
parentContextId : threadRecord.contextId,
protocol : protocolDefinition.protocol,
protocolPath : 'thread/email',
schema : protocolDefinition.types.email.schema
}
});
expect(emailStatus.code).to.equal(202);
expect(emailRecord).to.not.be.undefined;

const { status: emailSendStatus } = await emailRecord!.send();
expect(emailSendStatus.code).to.equal(202);

// update email record
const { status: updateStatus } = await emailRecord!.update({ data: 'updated email record', store: false });
expect(updateStatus.code).to.equal(202);

const { status: updateEmailSendStatus } = await emailRecord!.send();
expect(updateEmailSendStatus.code).to.equal(202);

let readResult = await dwnAlice.records.read({
from : aliceDid.uri,
message : {
filter: {
recordId: emailRecord.id
}
}
});

expect(readResult.status.code).to.equal(200);
expect(readResult.record).to.not.be.undefined;
expect(await readResult.record.data.text()).to.equal('updated email record');

// Confirm that the data CID of the record was updated.
expect(readResult.record.dataCid).to.not.equal(dataCidBeforeDataUpdate);
expect(readResult.record.dataCid).to.equal(queriedRecord!.dataCid);
// confirm that records do not exist locally
readResult = await dwnAlice.records.read({
message: {
filter: {
recordId: emailRecord.id
}
}
});
expect(readResult.status.code).to.equal(404);

// Confirm that the data payload of the record was modified.
const updatedData = await queriedRecord!.data.text();
expect(updatedData).to.equal('bye');
readResult = await dwnAlice.records.read({
message: {
filter: {
recordId: threadRecord.id
}
}
});
expect(readResult.status.code).to.equal(404);
});

it('updates a record which has a parent reference', async () => {
Expand Down

0 comments on commit d4ebc48

Please sign in to comment.