Skip to content

Commit

Permalink
Remove msgq timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
BlythMeister authored Oct 15, 2024
1 parent 55970a6 commit c291f43
Showing 1 changed file with 14 additions and 24 deletions.
38 changes: 14 additions & 24 deletions src/366x366/shared/msgq.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ function enqueue(messageKey, message, highPriority) {
const uuid = CreateUUID();
const id = `${messageKey}#${uuid}`;

const data = { id: id, uuid: uuid, messageKey: messageKey, message: message, qTime: new Date() };
const data = { id: id, uuid: uuid, messageKey: messageKey, message: message
};

dequeue(null, messageKey);
if (highPriority) {
Expand Down Expand Up @@ -348,20 +349,13 @@ function process() {
return;
}

if (Date.now() - queueItem.qTime >= 1200000) {
console.warn(`MQ::Queue item {queueItem.id} is over 20 minutes old, abandoning`);
dequeue(queueItem.id, null);
delayedProcess(1000);
return;
}

try {
if (debugSentReceive) {
console.log(`MQ::Sending message ${queueItem.id} - ${queueItem.messageKey} - ${JSON.stringify(queueItem.message)}`);
}
waitingForId = queueItem.id;
lastSent = Date.now();
send(`m_${queueItem.messageKey}`, { msgqType: "msgq_message", qSize: Math.max(0, queueHp.length + queueLp.length - 1), qTime: queueItem.qTime, id: queueItem.id, uuid: queueItem.uuid, msgqMessage: queueItem });
send(`m_${queueItem.messageKey}`, { msgqType: "msgq_message", qSize: Math.max(0, queueHp.length + queueLp.length - 1), id: queueItem.id, msgqKey: queueItem.messageKey, msgqMessage: queueItem.message });
} catch (e) {
waitingForId = null;
console.warn(`MQ::Send error, call process again in 2 seconds.${e}`);
Expand All @@ -373,16 +367,15 @@ function onMessage(event) {
lastReceived = Date.now();
const type = event.data.msgqType;
const id = event.data.id;
const uuid = event.data.uuid;
otherQueueSize = event.data.qSize;

if (debugSentReceive) {
console.log(`MQ::Got message ${id} - type ${type}. - Other QSize: ${otherQueueSize}`);
}

if (type == "msgq_message") {
const messageKey = event.data.msgqMessage.messageKey;
const message = event.data.msgqMessage.message;
const messageKey = event.data.msgqKey;
const message = event.data.msgqMessage;

if (debugMessages) {
console.log(`MQ::Message content ${id} - ${messageKey} -> ${JSON.stringify(message)}`);
Expand All @@ -392,28 +385,25 @@ function onMessage(event) {
if (debugSentReceive) {
console.log(`MQ::Sending receipt for ${id}`);
}
const ruuid = CreateUUID();
send(`r_${messageKey}`, { msgqType: "msgq_receipt", qSize: Math.max(0, queueHp.length + queueLp.length - 1), id: id, uuid: ruuid });
send(`r_${messageKey}`, { msgqType: "msgq_receipt", qSize: Math.max(0, queueHp.length + queueLp.length - 1), id: id });
} catch (e) {
console.error(`MQ::${e}`);
}

if (event.data.qTime != null && new Date() - event.data.qTime > 1200000) {
console.warn(`MQ::Message ${id} queued over 20 minutes ago will not process`);
} else {
try {
onMessageHandler(messageKey, message);
} catch (e) {
console.error(`MQ::Error handling ${id} - ${messageKey} -> ${JSON.stringify(message)}. ${e}`);
}
try {
onMessageHandler(messageKey, message);
} catch (e) {
console.error(`MQ::Error handling ${id} - ${messageKey} -> ${JSON.stringify(message)}. ${e}`);
}
} else if (type == "msgq_receipt") {
if (debugSentReceive) {
console.log(`MQ::Got receipt for ${id}`);
}
dequeue(id, null);
waitingForId = null;
delayedProcess(1000);
if (waitingForId == id) {
waitingForId = null;
delayedProcess(1000);
}
}
}

Expand Down

0 comments on commit c291f43

Please sign in to comment.