Skip to content

Commit

Permalink
Issue eventuate-clients#49: EntityDeletedException (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
dartvandru committed Jun 5, 2018
1 parent 253a02b commit 3fca0ac
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 39 deletions.
11 changes: 10 additions & 1 deletion src/modules/Encryption.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ export default class Encryption {
}

findKey(id) {
return this.encryptionKeyStore.get(id);
return this.encryptionKeyStore.get(id)
.then(key => {
if (!key) {
const err = new Error(`Encryption key "${id}" not found`);
err.code = 'EntityDeletedException';
return Promise.reject(err);
}

return key;
});
}

isEncrypted(eventDataStr) {
Expand Down
76 changes: 56 additions & 20 deletions src/modules/EventuateClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ export default class EventuateClient {
// Encrypt event data if needed
this.encryptEvents(encryptionKeyId, events)
.then(events => {
console.log('events:', events);
const jsonData = { entityTypeName, events };
this.addBodyOptions(jsonData, options);

Expand Down Expand Up @@ -345,22 +344,18 @@ export default class EventuateClient {

const messageCallback = this.createMessageCallback(eventHandler);

this.connectToStompServer().then(
() => {
this.connectToStompServer()
.then(() => {
this.addSubscription(subscriberId, entityTypesAndEvents, messageCallback, options, callback);
this.doClientSubscribe(subscriberId);
},
callback
);

}, callback);
}

createMessageCallback(eventHandler) {

const ackOrderTracker = new AckOrderTracker();

const acknowledge = (ack) => {

ackOrderTracker.ack(ack).forEach(this.stompClient.ack.bind(this.stompClient));
};

Expand All @@ -369,18 +364,40 @@ export default class EventuateClient {
ackOrderTracker.add(headers.ack);

body.forEach(eventStr => {

this.makeEvent(eventStr, headers.ack)
.then(event => {
eventHandler(event)
.then(acknowledge)
.catch(err => {
logger.error('eventHandler error', err);
});
})
.catch(error => {
throw new Error(error);
});
this.parseEvent(eventStr)
.then(parsedEvent => {
const { eventData: eventDataStr } = parsedEvent;
return this.decrypt(eventDataStr)
.then(decryptedEventData => {
try {
const eventData = JSON.parse(decryptedEventData);
const event = Object.assign(parsedEvent, { eventData }, { ack: headers.ack });
eventHandler(null, event)
.then(acknowledge)
.catch(err => {
logger.error('Event handler error', err);
});
} catch(err) {
return Promise.reject(err);
}
})
.catch(err => {
if (err.code === 'EntityDeletedException') {
const event = Object.assign(parsedEvent, { ack: headers.ack });
eventHandler(err, event)
.then(acknowledge)
.catch(err => {
logger.error('Event handler error', err);
});
return;
}

return Promise.reject(err);
});
})
.catch(err => {
throw err;
})
});
}
}
Expand Down Expand Up @@ -611,6 +628,25 @@ export default class EventuateClient {
}
}

parseEvent(eventStr) {
try {
const parsedEvent = JSON.parse(eventStr);
const { id: eventId, eventType, entityId, entityType, swimlane, eventToken, eventData } = parsedEvent;

return Promise.resolve({
eventId,
eventType,
entityId,
swimlane,
eventData,
eventToken,
entityType: entityType.split('/').pop(),
});
} catch (err) {
return Promise.reject(err);
}
}

serialiseObject(obj) {

if (typeof obj === 'object') {
Expand Down
4 changes: 2 additions & 2 deletions src/modules/EventuateServerError.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export default class EventuateServerError extends Error {

this.name = 'ES Server Error';

if (typeof (jsonBody) == 'object') {
if (typeof (jsonBody) === 'object') {

const { timestamp, status, statusCode, error, exception, message } = jsonBody;

Expand All @@ -17,7 +17,7 @@ export default class EventuateServerError extends Error {
this.error = error;
this.exception = exception;

if (typeof message == 'object') {
if (typeof message === 'object') {
jsonBody.message = JSON.stringify(jsonBody.message);
}

Expand Down
2 changes: 1 addition & 1 deletion src/modules/EventuateSubscriptionManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export default class EventuateSubscriptionManager {

this.logger.debug(`Subscribe "${subscriberId}" for:`, entityTypesAndEvents);

const eventHandler = (event) => {
const eventHandler = (err, event) => {
this.logger.debug(`Event for subscriber "${subscriberId}":`, event);

const dispatcher = this.dispatchers.get(subscriberId);
Expand Down
4 changes: 3 additions & 1 deletion src/modules/stomp/AckOrderTracker.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import util from 'util';

export default class AckOrderTracker {

Expand All @@ -17,7 +18,8 @@ export default class AckOrderTracker {
});

if (!pendingHeader) {
console.error(`ACK Header not found: ${ackHeader}`);
console.error(`ACK Header not found:
${util.inspect(ackHeader, false, 20)}`);
return [];
}

Expand Down
24 changes: 24 additions & 0 deletions test/Encryption-spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';
const { expect } = require('chai');
const helpers = require('./lib/helpers');
const Encryption = require('../dist/modules/Encryption');

const keyId = 'id';
Expand Down Expand Up @@ -75,7 +76,30 @@ describe('Encryption', () => {
done();
})
.catch(done);
});

it('should try to encrypt with not existing key', done => {
const eventData = '{ "foo": "bar" }';
encryption.encrypt(eventData)
.then(() => {
done(new Error('Should return error'));
})
.catch(error => {
helpers.expectEntityDeletedError(error);
done();
})
});

it('should try to decrypt with not existing key', done => {
const encryptedEventData = '__ENCRYPTED__{"encryptionKeyId":"notExistingKeyId","data":"7e735ffcb85082731198f779e9d5b180"}';
encryption.decrypt(encryptedEventData)
.then(() => {
done(new Error('Should return error'));
})
.catch(error => {
helpers.expectEntityDeletedError(error);
done();
});
});
});

10 changes: 8 additions & 2 deletions test/lib/helpers.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict';

const expect = require('chai').expect;
const EventuateClient = require('../../dist');
const EventuateClient = require('../../src');
const uuid = require('uuid');
const specialChars = require('../../dist/modules/specialChars');
const EventuateClientConfiguration = require('../../dist').EventuateClientConfiguration;
const EventuateClientConfiguration = require('../../src').EventuateClientConfiguration;

module.exports.removeEventsArrProperty = (eventsArr, propertyName) => {
return eventsArr.map(item => {
Expand Down Expand Up @@ -203,3 +203,9 @@ module.exports.createEventHandler = (callback) => {
});
}
};

module.exports.expectEntityDeletedError = (error) => {
expect(error).to.be.instanceOf(Error);
expect(error).to.haveOwnProperty('code');
expect(error.code).to.equal('EntityDeletedException');
};
2 changes: 1 addition & 1 deletion test/subscribe-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('Create and update entity. Subscribe for 2 events', function () {

let processedMessagesNumber = 0;

const eventHandler = (event) => {
const eventHandler = (err, event) => {

return new Promise((resolve, reject) => {

Expand Down
69 changes: 58 additions & 11 deletions test/subscribeEncrypted-spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const util = require('util');
const helpers = require('./lib/helpers');
const Encryption = require('../dist/modules/Encryption');
const Encryption = require('../src/modules/Encryption');

const encryptionKeyId = 'id';
const keySecret = 'secret';
Expand All @@ -14,21 +14,26 @@ class EncryptionStore {
get(encryptionKeyId) {
return Promise.resolve(this.keys[encryptionKeyId]);
}

removeKey(encryptionKeyId) {
delete this.keys[encryptionKeyId];
return Promise.resolve();
}
}

const encryptionKeyStore = new EncryptionStore({ [encryptionKeyId]: keySecret });
const encryption = new Encryption(encryptionKeyStore);

const eventuateClient = helpers.createEventuateClient(encryption);
const subscriberId = `subscriber-${helpers.getUniqueID()}`;
const subscriberId1 = `subscriber-${helpers.getUniqueID()}`;
const entityTypeName = `net.chrisrichardson.eventstore.example.MyEntity-${helpers.getUniqueID()}`;
const entityTypesAndEvents = {
[entityTypeName]: [
'net.chrisrichardson.eventstore.example.MyEntityWasCreated',
'net.chrisrichardson.eventstore.example.MyEntityNameChanged'
]
};

const createEvents = [ { eventType: 'net.chrisrichardson.eventstore.example.MyEntityWasCreated', eventData: '{"name":"Fred"}' } ];
const shouldBeProcessedNumber = 2;
let eventIds = [];

Expand All @@ -37,7 +42,6 @@ describe('Create and update entity. Subscribe for 2 events', function () {

it('should create and update one uniquely named entity and subscribe for events', done => {
//create events
const createEvents = [ { eventType: 'net.chrisrichardson.eventstore.example.MyEntityWasCreated', eventData: '{"name":"Fred"}' } ];

eventuateClient.create(entityTypeName, createEvents, { encryptionKeyId }, (err, createdEntityAndEventInfo) => {
if (err) {
Expand All @@ -63,15 +67,17 @@ describe('Create and update entity. Subscribe for 2 events', function () {
eventIds = eventIds.concat(updatedEntityAndEventInfo.eventIds);
let processedMessagesNumber = 0;

const eventHandler = (event) => {
const eventHandler = (err, event) => {
return new Promise((resolve, reject) => {

console.log('event:' ,event);
resolve(event.ack);
if (err) {
return done(err);
}
console.log('Event handler event:' ,event);

helpers.expectEvent(event);
resolve(event.ack);

if (eventIds.indexOf(event.eventId) >= 0 ) {
if (eventIds.indexOf(event.eventId) >= 0) {
processedMessagesNumber++;

if (processedMessagesNumber === shouldBeProcessedNumber) {
Expand All @@ -83,7 +89,7 @@ describe('Create and update entity. Subscribe for 2 events', function () {
});
};
//subscribe for events
eventuateClient.subscribe(subscriberId, entityTypesAndEvents, eventHandler, err => {
eventuateClient.subscribe(subscriberId1, entityTypesAndEvents, eventHandler, err => {
if (err) {
return done(err)
}
Expand All @@ -93,4 +99,45 @@ describe('Create and update entity. Subscribe for 2 events', function () {
});
});
});
});
});

describe('Encryption when key not exists', () => {
before(done => {
encryptionKeyStore.removeKey(encryptionKeyId)
.then(done)
.catch(done);
});

it('should try to create() and get EntityDeleted error', done => {
eventuateClient.create(entityTypeName, createEvents, { encryptionKeyId })
.then(() => {
done(new Error('Should get error'));
})
.catch(error => {
helpers.expectEntityDeletedError(error);
done();
});
});

it('should subscribe and get EntityDeleted error', done => {
const eventHandler = (err, event) => {
if (err) {
helpers.expectEntityDeletedError(err);
done();
return Promise.resolve(event.ack);
}

console.log('Event handler event:', event);
return Promise.resolve(event.ack);
};

const subscriberId2 = `subscriber-${helpers.getUniqueID()}`;
eventuateClient.subscribe(subscriberId2, entityTypesAndEvents, eventHandler, err => {
if (err) {
return done(err);
}
console.log('The subscription has been established.');
});
});
});

0 comments on commit 3fca0ac

Please sign in to comment.