Skip to content

Commit

Permalink
remove the use of murmurhash3
Browse files Browse the repository at this point in the history
  • Loading branch information
idanasulin2706 committed Nov 30, 2023
1 parent 06b4704 commit aa8f76a
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 161 deletions.
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ real-time and streaming apps fast.
$ npm install memphis-dev
```

Notice: you may receive an error about the "murmurhash3" package, to solve it please install g++
```sh
$ sudo yum install -y /usr/bin/g++
```

## Importing

For Javascript, you can choose to use the import or required keyword. This library exports a singleton instance of `memphis` with which you can consume and produce messages.
Expand Down
65 changes: 43 additions & 22 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Consumer {
this.eventEmitter.on(event, cb);
}
async fetch({ batchSize = 10, consumerPartitionKey = null, consumerPartitionNumber = -1 }) {
var e_1, _a;
var _a, e_1, _b, _c;
try {
if (batchSize > maxBatchSize) {
throw (0, utils_1.MemphisError)(new Error(`Batch size can not be greater than ${maxBatchSize}`));
Expand All @@ -97,7 +97,7 @@ class Consumer {
throw (0, utils_1.MemphisError)(new Error('Can not use both partition number and partition key'));
}
if (consumerPartitionKey != null) {
const partitionNumberKey = await this.connection._getPartitionFromKey(consumerPartitionKey, this.internalStationName);
const partitionNumberKey = this.connection._getPartitionFromKey(consumerPartitionKey, this.internalStationName);
streamName = `${this.internalStationName}$${partitionNumberKey.toString()}`;
}
else if (consumerPartitionNumber > 0) {
Expand Down Expand Up @@ -126,15 +126,22 @@ class Consumer {
const durableName = this.consumerGroup ? this.internalConsumerGroupName : this.internalConsumerName;
const batch = await this.connection.brokerConnection.fetch(streamName, durableName, { batch: batchSize, expires: this.batchMaxTimeToWaitMs });
try {
for (var batch_1 = __asyncValues(batch), batch_1_1; batch_1_1 = await batch_1.next(), !batch_1_1.done;) {
const m = batch_1_1.value;
messages.push(new message_1.Message(m, this.connection, this.consumerGroup, this.internalStationName));
for (var _d = true, batch_1 = __asyncValues(batch), batch_1_1; batch_1_1 = await batch_1.next(), _a = batch_1_1.done, !_a;) {
_c = batch_1_1.value;
_d = false;
try {
const m = _c;
messages.push(new message_1.Message(m, this.connection, this.consumerGroup, this.internalStationName));
}
finally {
_d = true;
}
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (batch_1_1 && !batch_1_1.done && (_a = batch_1.return)) await _a.call(batch_1);
if (!_d && !_a && (_b = batch_1.return)) await _b.call(batch_1);
}
finally { if (e_1) throw e_1.error; }
}
Expand All @@ -145,41 +152,55 @@ class Consumer {
}
}
async _handleAsyncIterableSubscriber(iter, isDls) {
var e_2, _a;
var _a, e_2, _b, _c;
try {
for (var iter_1 = __asyncValues(iter), iter_1_1; iter_1_1 = await iter_1.next(), !iter_1_1.done;) {
const m = iter_1_1.value;
if (isDls) {
let indexToInsert = this.dlsCurrentIndex;
if (this.dlsCurrentIndex >= 10000) {
indexToInsert %= 10000;
for (var _d = true, iter_1 = __asyncValues(iter), iter_1_1; iter_1_1 = await iter_1.next(), _a = iter_1_1.done, !_a;) {
_c = iter_1_1.value;
_d = false;
try {
const m = _c;
if (isDls) {
let indexToInsert = this.dlsCurrentIndex;
if (this.dlsCurrentIndex >= 10000) {
indexToInsert %= 10000;
}
this.dlsMessages[indexToInsert] = new message_1.Message(m, this.connection, this.consumerGroup, this.internalStationName);
this.dlsCurrentIndex++;
}
this.dlsMessages[indexToInsert] = new message_1.Message(m, this.connection, this.consumerGroup, this.internalStationName);
this.dlsCurrentIndex++;
this.eventEmitter.emit('message', new message_1.Message(m, this.connection, this.consumerGroup, this.internalStationName), this.context);
}
finally {
_d = true;
}
this.eventEmitter.emit('message', new message_1.Message(m, this.connection, this.consumerGroup, this.internalStationName), this.context);
}
}
catch (e_2_1) { e_2 = { error: e_2_1 }; }
finally {
try {
if (iter_1_1 && !iter_1_1.done && (_a = iter_1.return)) await _a.call(iter_1);
if (!_d && !_a && (_b = iter_1.return)) await _b.call(iter_1);
}
finally { if (e_2) throw e_2.error; }
}
}
async _handleAsyncConsumedMessages(messages, isDls) {
var e_3, _a;
var _a, e_3, _b, _c;
try {
for (var messages_1 = __asyncValues(messages), messages_1_1; messages_1_1 = await messages_1.next(), !messages_1_1.done;) {
const m = messages_1_1.value;
this.eventEmitter.emit('message', m, this.context);
for (var _d = true, messages_1 = __asyncValues(messages), messages_1_1; messages_1_1 = await messages_1.next(), _a = messages_1_1.done, !_a;) {
_c = messages_1_1.value;
_d = false;
try {
const m = _c;
this.eventEmitter.emit('message', m, this.context);
}
finally {
_d = true;
}
}
}
catch (e_3_1) { e_3 = { error: e_3_1 }; }
finally {
try {
if (messages_1_1 && !messages_1_1.done && (_a = messages_1.return)) await _a.call(messages_1);
if (!_d && !_a && (_b = messages_1.return)) await _b.call(messages_1);
}
finally { if (e_3) throw e_3.error; }
}
Expand Down
2 changes: 1 addition & 1 deletion lib/memphis.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ declare class Memphis {
timeoutRetry?: number;
}): Promise<void>;
private log;
_getPartitionFromKey(key: string, stationName: string): Promise<number>;
_getPartitionFromKey(key: string, stationName: string): number;
_validatePartitionNumber(partitionNumber: number, stationName: string): Promise<void>;
}
export declare class RoundRobinProducerConsumerGenerator {
Expand Down
Loading

0 comments on commit aa8f76a

Please sign in to comment.