Skip to content

Commit

Permalink
Merge pull request #281 from cardanoapi/feat/apm-integration
Browse files Browse the repository at this point in the history
Add APM tracing for services
  • Loading branch information
mesudip authored Dec 23, 2024
2 parents e566ae1 + 1e30b9c commit b13082f
Show file tree
Hide file tree
Showing 20 changed files with 1,510 additions and 43 deletions.
4 changes: 4 additions & 0 deletions agent-manager/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ SERVER_PORT=
NETWORK_NAME=




ELASTIC_APM_SERVER_URL=https://apm.sireto.io
ELASTIC_APM_API_KEY=XXX
11 changes: 7 additions & 4 deletions agent-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
"version": "1.0.0",
"main": "dist/index.js",
"scripts": {
"start": "ts-node src/index.ts",
"start": "cd dist && node index.js",
"build": "tsc",
"dev": "ts-node-dev --respawn --transpile-only src/index.ts",
"dev": "ts-node src/index.ts",
"format:check": "prettier --check .",
"lint:check": "eslint .",
"format:write": "prettier --write .",
"lint:fix": "eslint --fix ."
"lint:fix": "eslint --fix .",
"prisma:generate": "prisma generate"
},
"keywords": [],
"author": "Roshan Gyawali",
Expand All @@ -21,6 +22,8 @@
"axios": "^1.6.8",
"cors": "^2.8.5",
"date-fns": "^3.6.0",
"dotenv": "^16.4.7",
"elastic-apm-node": "^4.9.0",
"express": "^4.19.2",
"kafkajs": "^2.2.4",
"kuber-client": "^3.0.3",
Expand All @@ -36,7 +39,7 @@
"@eslint/js": "^9.4.0",
"@types/cors": "^2.8.17",
"@types/luxon": "^3.4.2",
"@types/node": "^20.12.7",
"@types/node": "^22.10.2",
"@types/swagger-jsdoc": "^6.0.4",
"@types/uuid": "^9.0.8",
"@types/ws": "^8.5.10",
Expand Down
25 changes: 25 additions & 0 deletions agent-manager/src/config/tracer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { ParameterizedMessageObject, Span, Transaction } from "elastic-apm-node";

export const apm = require('elastic-apm-node')


function readConfig(){
if(process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY && process.env.ELASTIC_APM_ENVIRONMENT){
return {
ELASTIC_APM_SERVER_URL: process.env.ELASTIC_APM_SERVER_URL,
ELASTIC_APM_API_KEY: process.env.ELASTIC_APM_API_KEY,
ELASTIC_APM_ENVIRONMENT: process.env.ELASTIC_APM_ENVIRONMENT
}
}
}
export const apmConfig=readConfig()

export const startTransaction= (...args: any[]): Transaction=>{
return apm.startTransaction(...args);
}
export const captureError= (err: Error | string | ParameterizedMessageObject)=>{
return apm.captureError(err)
}
export const startSpan = (...args: any[]) : Span | any =>{
return apm.startSpan(...args);
}
21 changes: 20 additions & 1 deletion agent-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
process.env.ELASTIC_APM_LOG_LEVEL='warning'
process.env.ELASTIC_APM_SERVICE_NAME='autonomous-agents-manager'
process.env.ELASTIC_APM_ENVIRONMENT=process.env.ELASTIC_APM_ENVIRONMENT || 'local'
import * as dotenv from 'dotenv'
dotenv.config()
import * as agent from 'elastic-apm-node/start'; agent;
import { startTransaction } from "./config/tracer";
// agent.start({
// serviceName: 'autonomous-agents-manager',
// environment: 'local',
// // Use if APM Server requires a token
//
// // Use if APM Server uses API keys for authentication
// apiKey: process.env.ELASTIC_APM_API_KEY,
//
// // Set custom APM Server URL (default: http://127.0.0.1:8200)
// serverUrl: process.env.ELASTIC_APM_SERVER_URL,
// })
import express from 'express'
import { WebSocket } from 'ws'
import { initKafkaConsumers } from './service/Listeners/KafkaMessageConsumer'
Expand All @@ -7,7 +25,6 @@ import { ManagerWalletService } from './service/Manager/ManagerWallet'
import { TxListener } from './service/Listeners/TxListener'
import { parseRawBlockBody } from 'libcardano/cardano/ledger-serialization/transaction'
import environments from "./config/environments";

const app = express()
const port = environments.serverPort

Expand All @@ -26,12 +43,14 @@ const server = app.listen(port, async () => {

blockchain.start()
blockchain.blockChain.on('extendBlock', (block) => {
const transaction = startTransaction('extendBlock','node',)
console.log(
`[Blockchain] RollForward blockNo=${block.blockNo} hash=${block.headerHash.toString('hex')} slot=${block.slotNo}`
)
const transactions = parseRawBlockBody(block.body)
txListener.onBlock({ ...block, body: transactions })
manager.broadcast('extend_block', block)
transaction.end('success')
})
await initKafkaConsumers(manager)
})
Expand Down
3 changes: 1 addition & 2 deletions agent-manager/src/repository/agent_manager_repository.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { JsonValue } from '@prisma/client/runtime/library'
import { PrismaClient } from '@prisma/client'
import { prisma } from "./dbClient";

const prisma = new PrismaClient()

export async function getAgentIdBySecret(agentSecret: Buffer): Promise<string | null> {
return prisma.agent
Expand Down
24 changes: 24 additions & 0 deletions agent-manager/src/repository/dbClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PrismaClient } from "@prisma/client";
import { apm } from "../config/tracer";

export const prisma = new PrismaClient()

if(process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY){
prisma.$use(async (params, next) => {
const spanName=params.model?`prisma.${params.model}.${params.action}`:`prisma.${params.action}`
const span = apm.startSpan(spanName);
if (span) {
span.type = "DB";
span.subtype = "prisma";
span.action = "query";
}
try {
const result = await next(params);
span?.end();
return result;
} catch (e) {
span?.end();
throw e;
}
});
}
2 changes: 1 addition & 1 deletion agent-manager/src/repository/trigger_history_repository.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { PrismaClient } from '@prisma/client'
import { DateTime } from 'luxon'
import { v4 as uuidv4 } from 'uuid'
import { prisma } from "./dbClient";

const prisma = new PrismaClient()

export type TriggerType = 'CRON' | 'MANUAL' | 'EVENT' | 'INTERNAL'

Expand Down
66 changes: 63 additions & 3 deletions agent-manager/src/service/Manager/AgentManagerRPC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {ManagerWalletService} from './ManagerWallet'
import {Server} from 'ws'
import {generateRootKey} from '../../utils/cardano'
import {RPCTopicHandler} from "../RPCTopicHandler";
import { apmConfig, captureError, startTransaction } from "../../config/tracer";


export interface ILog {
function_name: string
Expand All @@ -24,7 +26,7 @@ export interface ILog {
parameters?: string
result?: string
}

let _tx: any = undefined
export class AgentManagerRPC extends WsRpcServer {
managerWallet: ManagerWalletService
rpcTopicHandler: RPCTopicHandler
Expand All @@ -36,29 +38,70 @@ export class AgentManagerRPC extends WsRpcServer {
}

protected async validateConnection(req: IncomingMessage): Promise<string> {
let tx = _tx = startTransaction('connect','request')
const agentSecretKey = req.url?.slice(1)
console.log('new connection from', req.socket.remoteAddress)
tx.addLabels({
'request_ip': req.socket.remoteAddress, // Request IP address (note: underscore instead of dot)
});

// Loop through all request headers and add them as labels
for (let [headerKey, headerValue] of Object.entries(req.headers)) {
// Replace dots in header keys with underscores
const labelKey = `request_headers_${headerKey.replace(/\./g, '_')}`;

// Make sure the header value is a string and add as a label
if (typeof headerValue === 'string') {
tx.addLabels({ [labelKey]: headerValue });
} else {
// If the value is not a string (like an array), we can stringify it
tx.addLabels({ [labelKey]: JSON.stringify(headerValue) });
}
}

if (agentSecretKey) {
const exists = await getAgentIdBySecret(Buffer.from(agentSecretKey, 'base64'))
if (exists) {
return exists
} else {
tx.setOutcome('failure')
tx.end(`Agent with secret_key ${agentSecretKey} doesn't exist`)
throw Error(`Agent with secret_key ${agentSecretKey} doesn't exist`)
}
} else {
tx.setOutcome('failure')
tx.end(`Invalid websocket connection`)
throw Error('Invalid websocket connection')
}
}

protected async handleMethodCall(connection_id: string, method: string, args: any[]) {
console.log('Method call from client', connection_id, method, args)
return this.rpcTopicHandler.handleEvent(method, connection_id, args)
const tx=startTransaction("RPC "+method,'request')
const labels: Record<string,string>={
'connectionId': connection_id,
'method': method
}
args.map((x,index)=>{
labels['arg'+index] = x
})
tx.addLabels(labels)
let _tx=tx as any
_tx.context = {'connectionId': connection_id}

return this.rpcTopicHandler.handleEvent(method, connection_id, args).then(v=>{
tx.setOutcome('success')
tx.addLabels({'result': v})
return v
})
.catch(e=>{
captureError(e)
throw e
}).finally(()=>tx.end())
}

protected async validateEventBroadcast(connection_id: string, topic: string, message: any): Promise<boolean> {
// TODO: handle the event emitted
console.log('Event from client', connection_id, topic, message)
if (topic === 'active_connection') {
await updateLastActiveTimestamp(connection_id)
}
Expand All @@ -67,6 +110,8 @@ export class AgentManagerRPC extends WsRpcServer {
}

protected onReady(client: RpcV1): void {
const thisTx=_tx;
_tx=undefined
const agentConfigsPromise = fetchAgentConfiguration(client.getId())
.then(async (config) => {
if (!config) {
Expand All @@ -77,6 +122,10 @@ export class AgentManagerRPC extends WsRpcServer {
const {instanceCount, agentIndex, agentName} = config
const rootKeyBuffer = await generateRootKey(agentIndex || 0)
client.emit('instance_count', {instanceCount, rootKeyBuffer, agentName})
if(apmConfig)
{
(config as any).apm=apmConfig
}

client.emit('initial_config', config)
})
Expand All @@ -86,6 +135,17 @@ export class AgentManagerRPC extends WsRpcServer {
Promise.all([agentConfigsPromise]).catch((err: Error) => {
console.error('AgentManagerRPC onReady Error:', err)
this.disconnect(client.getId(), err.message)
}).then(()=>{
if(thisTx){
thisTx.setOutcome('success')
}
}).catch((e)=>{
captureError(e)
}).finally(()=>{
if(thisTx){
thisTx.end()
}
})

}
}
5 changes: 3 additions & 2 deletions agent-manager/src/service/RPCTopicHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ export class RPCTopicHandler {
eventName: string,
connection_id: string,
args: any,
) {
) : Promise<any> {
const handler = (this as any)[eventName]
if (handler === undefined || eventName === 'constructor') {
console.error('Unknown event type', eventName, 'received')
return Promise.resolve()
} else {
return handler.bind(this)(connection_id, args)
}
Expand All @@ -38,7 +39,7 @@ export class RPCTopicHandler {
logEvent(connection_id: string, args: any[]) {
const params: ILog = args[0]
const txHash = params.txHash ? params.txHash : ''
saveTriggerHistory(
return saveTriggerHistory(
connection_id,
params.function_name,
params.trigger,
Expand Down
Loading

0 comments on commit b13082f

Please sign in to comment.