Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add APM tracing for services #281

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent-manager/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ SERVER_PORT=
NETWORK_NAME=




ELASTIC_APM_SERVER_URL=https://apm.sireto.io
ELASTIC_APM_API_KEY=XXX
11 changes: 7 additions & 4 deletions agent-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
"version": "1.0.0",
"main": "dist/index.js",
"scripts": {
"start": "ts-node src/index.ts",
"start": "cd dist && node index.js",
"build": "tsc",
"dev": "ts-node-dev --respawn --transpile-only src/index.ts",
"dev": "ts-node src/index.ts",
"format:check": "prettier --check .",
"lint:check": "eslint .",
"format:write": "prettier --write .",
"lint:fix": "eslint --fix ."
"lint:fix": "eslint --fix .",
"prisma:generate": "prisma generate"
},
"keywords": [],
"author": "Roshan Gyawali",
Expand All @@ -21,6 +22,8 @@
"axios": "^1.6.8",
"cors": "^2.8.5",
"date-fns": "^3.6.0",
"dotenv": "^16.4.7",
"elastic-apm-node": "^4.9.0",
"express": "^4.19.2",
"kafkajs": "^2.2.4",
"kuber-client": "^3.0.3",
Expand All @@ -36,7 +39,7 @@
"@eslint/js": "^9.4.0",
"@types/cors": "^2.8.17",
"@types/luxon": "^3.4.2",
"@types/node": "^20.12.7",
"@types/node": "^22.10.2",
"@types/swagger-jsdoc": "^6.0.4",
"@types/uuid": "^9.0.8",
"@types/ws": "^8.5.10",
Expand Down
25 changes: 25 additions & 0 deletions agent-manager/src/config/tracer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { ParameterizedMessageObject, Span, Transaction } from "elastic-apm-node";

export const apm = require('elastic-apm-node')


function readConfig(){
if(process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY && process.env.ELASTIC_APM_ENVIRONMENT){
return {
ELASTIC_APM_SERVER_URL: process.env.ELASTIC_APM_SERVER_URL,
ELASTIC_APM_API_KEY: process.env.ELASTIC_APM_API_KEY,
ELASTIC_APM_ENVIRONMENT: process.env.ELASTIC_APM_ENVIRONMENT
}
}
}
export const apmConfig=readConfig()

export const startTransaction= (...args: any[]): Transaction=>{
return apm.startTransaction(...args);
}
export const captureError= (err: Error | string | ParameterizedMessageObject)=>{
return apm.captureError(err)
}
export const startSpan = (...args: any[]) : Span | any =>{
return apm.startSpan(...args);
}
21 changes: 20 additions & 1 deletion agent-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
process.env.ELASTIC_APM_LOG_LEVEL='warning'
process.env.ELASTIC_APM_SERVICE_NAME='autonomous-agents-manager'
process.env.ELASTIC_APM_ENVIRONMENT=process.env.ELASTIC_APM_ENVIRONMENT || 'local'
import * as dotenv from 'dotenv'
dotenv.config()
import * as agent from 'elastic-apm-node/start'; agent;
import { startTransaction } from "./config/tracer";
// agent.start({
// serviceName: 'autonomous-agents-manager',
// environment: 'local',
// // Use if APM Server requires a token
//
// // Use if APM Server uses API keys for authentication
// apiKey: process.env.ELASTIC_APM_API_KEY,
//
// // Set custom APM Server URL (default: http://127.0.0.1:8200)
// serverUrl: process.env.ELASTIC_APM_SERVER_URL,
// })
import express from 'express'
import { WebSocket } from 'ws'
import { initKafkaConsumers } from './service/Listeners/KafkaMessageConsumer'
Expand All @@ -7,7 +25,6 @@ import { ManagerWalletService } from './service/Manager/ManagerWallet'
import { TxListener } from './service/Listeners/TxListener'
import { parseRawBlockBody } from 'libcardano/cardano/ledger-serialization/transaction'
import environments from "./config/environments";

const app = express()
const port = environments.serverPort

Expand All @@ -26,12 +43,14 @@ const server = app.listen(port, async () => {

blockchain.start()
blockchain.blockChain.on('extendBlock', (block) => {
const transaction = startTransaction('extendBlock','node',)
console.log(
`[Blockchain] RollForward blockNo=${block.blockNo} hash=${block.headerHash.toString('hex')} slot=${block.slotNo}`
)
const transactions = parseRawBlockBody(block.body)
txListener.onBlock({ ...block, body: transactions })
manager.broadcast('extend_block', block)
transaction.end('success')
})
await initKafkaConsumers(manager)
})
Expand Down
3 changes: 1 addition & 2 deletions agent-manager/src/repository/agent_manager_repository.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { JsonValue } from '@prisma/client/runtime/library'
import { PrismaClient } from '@prisma/client'
import { prisma } from "./dbClient";

const prisma = new PrismaClient()

export async function getAgentIdBySecret(agentSecret: Buffer): Promise<string | null> {
return prisma.agent
Expand Down
24 changes: 24 additions & 0 deletions agent-manager/src/repository/dbClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PrismaClient } from "@prisma/client";
import { apm } from "../config/tracer";

export const prisma = new PrismaClient()

if(process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY){
prisma.$use(async (params, next) => {
const spanName=params.model?`prisma.${params.model}.${params.action}`:`prisma.${params.action}`
const span = apm.startSpan(spanName);
if (span) {
span.type = "DB";
span.subtype = "prisma";
span.action = "query";
}
try {
const result = await next(params);
span?.end();
return result;
} catch (e) {
span?.end();
throw e;
}
});
}
2 changes: 1 addition & 1 deletion agent-manager/src/repository/trigger_history_repository.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { PrismaClient } from '@prisma/client'
import { DateTime } from 'luxon'
import { v4 as uuidv4 } from 'uuid'
import { prisma } from "./dbClient";

const prisma = new PrismaClient()

export type TriggerType = 'CRON' | 'MANUAL' | 'EVENT' | 'INTERNAL'

Expand Down
66 changes: 63 additions & 3 deletions agent-manager/src/service/Manager/AgentManagerRPC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {ManagerWalletService} from './ManagerWallet'
import {Server} from 'ws'
import {generateRootKey} from '../../utils/cardano'
import {RPCTopicHandler} from "../RPCTopicHandler";
import { apmConfig, captureError, startTransaction } from "../../config/tracer";


export interface ILog {
function_name: string
Expand All @@ -24,7 +26,7 @@ export interface ILog {
parameters?: string
result?: string
}

let _tx: any = undefined
export class AgentManagerRPC extends WsRpcServer {
managerWallet: ManagerWalletService
rpcTopicHandler: RPCTopicHandler
Expand All @@ -36,29 +38,70 @@ export class AgentManagerRPC extends WsRpcServer {
}

protected async validateConnection(req: IncomingMessage): Promise<string> {
let tx = _tx = startTransaction('connect','request')
const agentSecretKey = req.url?.slice(1)
console.log('new connection from', req.socket.remoteAddress)
tx.addLabels({
'request_ip': req.socket.remoteAddress, // Request IP address (note: underscore instead of dot)
});

// Loop through all request headers and add them as labels
for (let [headerKey, headerValue] of Object.entries(req.headers)) {
// Replace dots in header keys with underscores
const labelKey = `request_headers_${headerKey.replace(/\./g, '_')}`;

// Make sure the header value is a string and add as a label
if (typeof headerValue === 'string') {
tx.addLabels({ [labelKey]: headerValue });
} else {
// If the value is not a string (like an array), we can stringify it
tx.addLabels({ [labelKey]: JSON.stringify(headerValue) });
}
}

if (agentSecretKey) {
const exists = await getAgentIdBySecret(Buffer.from(agentSecretKey, 'base64'))
if (exists) {
return exists
} else {
tx.setOutcome('failure')
tx.end(`Agent with secret_key ${agentSecretKey} doesn't exist`)
throw Error(`Agent with secret_key ${agentSecretKey} doesn't exist`)
}
} else {
tx.setOutcome('failure')
tx.end(`Invalid websocket connection`)
throw Error('Invalid websocket connection')
}
}

protected async handleMethodCall(connection_id: string, method: string, args: any[]) {
console.log('Method call from client', connection_id, method, args)
return this.rpcTopicHandler.handleEvent(method, connection_id, args)
const tx=startTransaction("RPC "+method,'request')
const labels: Record<string,string>={
'connectionId': connection_id,
'method': method
}
args.map((x,index)=>{
labels['arg'+index] = x
})
tx.addLabels(labels)
let _tx=tx as any
_tx.context = {'connectionId': connection_id}

return this.rpcTopicHandler.handleEvent(method, connection_id, args).then(v=>{
tx.setOutcome('success')
tx.addLabels({'result': v})
return v
})
.catch(e=>{
captureError(e)
throw e
}).finally(()=>tx.end())
}

protected async validateEventBroadcast(connection_id: string, topic: string, message: any): Promise<boolean> {
// TODO: handle the event emitted
console.log('Event from client', connection_id, topic, message)
if (topic === 'active_connection') {
await updateLastActiveTimestamp(connection_id)
}
Expand All @@ -67,6 +110,8 @@ export class AgentManagerRPC extends WsRpcServer {
}

protected onReady(client: RpcV1): void {
const thisTx=_tx;
_tx=undefined
const agentConfigsPromise = fetchAgentConfiguration(client.getId())
.then(async (config) => {
if (!config) {
Expand All @@ -77,6 +122,10 @@ export class AgentManagerRPC extends WsRpcServer {
const {instanceCount, agentIndex, agentName} = config
const rootKeyBuffer = await generateRootKey(agentIndex || 0)
client.emit('instance_count', {instanceCount, rootKeyBuffer, agentName})
if(apmConfig)
{
(config as any).apm=apmConfig
}

client.emit('initial_config', config)
})
Expand All @@ -86,6 +135,17 @@ export class AgentManagerRPC extends WsRpcServer {
Promise.all([agentConfigsPromise]).catch((err: Error) => {
console.error('AgentManagerRPC onReady Error:', err)
this.disconnect(client.getId(), err.message)
}).then(()=>{
if(thisTx){
thisTx.setOutcome('success')
}
}).catch((e)=>{
captureError(e)
}).finally(()=>{
if(thisTx){
thisTx.end()
}
})

}
}
5 changes: 3 additions & 2 deletions agent-manager/src/service/RPCTopicHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ export class RPCTopicHandler {
eventName: string,
connection_id: string,
args: any,
) {
) : Promise<any> {
const handler = (this as any)[eventName]
if (handler === undefined || eventName === 'constructor') {
console.error('Unknown event type', eventName, 'received')
return Promise.resolve()
} else {
return handler.bind(this)(connection_id, args)
}
Expand All @@ -38,7 +39,7 @@ export class RPCTopicHandler {
logEvent(connection_id: string, args: any[]) {
const params: ILog = args[0]
const txHash = params.txHash ? params.txHash : ''
saveTriggerHistory(
return saveTriggerHistory(
connection_id,
params.function_name,
params.trigger,
Expand Down
Loading
Loading