Skip to content

Commit

Permalink
ADD: RPC received over time
Browse files Browse the repository at this point in the history
  • Loading branch information
gbayasgalan committed Oct 16, 2024
1 parent 80b4893 commit bb70564
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
2 changes: 2 additions & 0 deletions launcher/src/backend/Monitoring.js
Original file line number Diff line number Diff line change
Expand Up @@ -2802,6 +2802,7 @@ export class Monitoring {
// if(p2pstatus.code)
// return p2pstatus;
const subnetSubs = await this.getSubnetSubs();
const rpcReceivedData = await this.nodeConnection.sshService.getRPCReceivedData();
return {
code: 0,
info: "success: data successfully retrieved",
Expand All @@ -2814,6 +2815,7 @@ export class Monitoring {
beaconstatus: beaconstatus,
portstatus: portstatus,
subnetSubs: subnetSubs,
rpcReceivedData: rpcReceivedData,
},
};
} catch (err) {
Expand Down
52 changes: 52 additions & 0 deletions launcher/src/backend/SSHService.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class SSHService {
this.connectionInfo = null;
this.connected = false;
this.tunnels = [];
this.rpcReceivedDatas = [];
this.addingConnection = false;
this.removeConnectionCount = 0;
this.checkPoolPolling = setInterval(async () => {
Expand Down Expand Up @@ -287,13 +288,64 @@ export class SSHService {
log.error("Tunnel SSH Connection error: ", error);
});

server.on("connection", (connection) => {
// Forward the connection to the destination address and port
conn.forwardOut(forwardOptions.srcAddr, forwardOptions.srcPort, forwardOptions.dstAddr, forwardOptions.dstPort, (err, stream) => {
if (err) {
log.error("Forwarding error: ", err);
return;
}

// Pipe the connection to the stream and vice versa
connection.pipe(stream).pipe(connection);

// Listen for data on the stream
stream.on("data", (data) => {
// Call the rpcReceivedData method to handle the received data
this.handleReceivedData(data.length, forwardOptions.srcPort);
});
});
});

server.on("error", function (error) {
log.error("Tunnel connection error: ", error);
});
});
});
}

/**
* Handles the received data by storing it in the rpcReceivedDatas array.
* @param {number} dataLength - The length of the received data (byte).
* @param {number} dstPort - The destination port.
*/
async handleReceivedData(dataLength, srcPort) {
try {
const receivedData = {
receivedDataLength: dataLength,
dstPort: srcPort,
};
this.rpcReceivedDatas.push(receivedData);
} catch (error) {
console.error("Error handling received data:", error);
}
}

/**
* Retrieves and clears the stored received data.
* @returns {Array} - The array of received data objects.
*/
async getRPCReceivedData() {
try {
const dataToReturn = [...this.rpcReceivedDatas];
this.rpcReceivedDatas = [];
return dataToReturn;
} catch (error) {
console.error("Error retrieving and clearing received data:", error);
return [];
}
}

async closeTunnels(onlySpecificPorts = []) {
return new Promise((resolve, reject) => {
let i = this.tunnels.length;
Expand Down

0 comments on commit bb70564

Please sign in to comment.