Skip to content

Commit

Permalink
fix: Resolve merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Sital999 committed Jan 3, 2025
2 parents a3725a0 + b72b406 commit ebe865c
Show file tree
Hide file tree
Showing 31 changed files with 3,914 additions and 64 deletions.
5 changes: 5 additions & 0 deletions agent-manager/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ SANCHONET_FAUCET_API_KEY=

AGENT_MNEMONIC=
METADATA_BASE_URL=
METADATA_FETCH_BASE_URL=

DB_SYNC_BASE_URL=

SERVER_PORT=
NETWORK_NAME=




ELASTIC_APM_SERVER_URL=https://apm.sireto.io
ELASTIC_APM_API_KEY=XXX
11 changes: 7 additions & 4 deletions agent-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
"version": "1.0.0",
"main": "dist/index.js",
"scripts": {
"start": "ts-node src/index.ts",
"start": "cd dist && node index.js",
"build": "tsc",
"dev": "ts-node-dev --respawn --transpile-only src/index.ts",
"dev": "ts-node src/index.ts",
"format:check": "prettier --check .",
"lint:check": "eslint .",
"format:write": "prettier --write .",
"lint:fix": "eslint --fix ."
"lint:fix": "eslint --fix .",
"prisma:generate": "prisma generate"
},
"keywords": [],
"author": "Roshan Gyawali",
Expand All @@ -21,6 +22,8 @@
"axios": "^1.6.8",
"cors": "^2.8.5",
"date-fns": "^3.6.0",
"dotenv": "^16.4.7",
"elastic-apm-node": "^4.9.0",
"express": "^4.19.2",
"kafkajs": "^2.2.4",
"kuber-client": "^3.0.3",
Expand All @@ -36,7 +39,7 @@
"@eslint/js": "^9.4.0",
"@types/cors": "^2.8.17",
"@types/luxon": "^3.4.2",
"@types/node": "^20.12.7",
"@types/node": "^22.10.2",
"@types/swagger-jsdoc": "^6.0.4",
"@types/uuid": "^9.0.8",
"@types/ws": "^8.5.10",
Expand Down
1 change: 1 addition & 0 deletions agent-manager/src/config/environments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const environments = {
managerWalletSigningKey: process.env.MANAGER_WALLET_SIGNING_KEY || '',
agentMnemonic: process.env.AGENT_MNEMONIC || '',
metaDataBaseURL: process.env.METADATA_BASE_URL || 'https://metadata.cardanoapi.io',
metaDataFetchBaseURL: process.env.METADATA_FETCH_BASE_URL || 'https://metadata.drep.id',
dbSyncBaseUrl: process.env.DB_SYNC_BASE_URL || 'https://dbsyncapi.agents.cardanoapi.io/api/',
serverPort: process.env.SERVER_PORT || '3001',
networkName: process.env.NETWORK_NAME || 'sanchonet',
Expand Down
24 changes: 24 additions & 0 deletions agent-manager/src/config/tracer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ParameterizedMessageObject, Span, Transaction } from 'elastic-apm-node'

import apm from 'elastic-apm-node'

function readConfig() {
if (process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY && process.env.ELASTIC_APM_ENVIRONMENT) {
return {
ELASTIC_APM_SERVER_URL: process.env.ELASTIC_APM_SERVER_URL,
ELASTIC_APM_API_KEY: process.env.ELASTIC_APM_API_KEY,
ELASTIC_APM_ENVIRONMENT: process.env.ELASTIC_APM_ENVIRONMENT,
}
}
}
export const apmConfig = readConfig()

export const startTransaction = (...args: any[]): Transaction => {
return apm.startTransaction(...args)
}
export const captureError = (err: Error | string | ParameterizedMessageObject) => {
return apm.captureError(err)
}
export const startSpan = (...args: any[]): Span | any => {
return apm.startSpan(...args)
}
19 changes: 14 additions & 5 deletions agent-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
process.env.ELASTIC_APM_LOG_LEVEL = 'warning'
process.env.ELASTIC_APM_SERVICE_NAME = 'autonomous-agents-manager'
process.env.ELASTIC_APM_ENVIRONMENT = process.env.ELASTIC_APM_ENVIRONMENT || 'local'
import * as dotenv from 'dotenv'
dotenv.config()
import * as agent from 'elastic-apm-node/start'
agent
import { startTransaction } from './config/tracer'
import { cardanoNodeStatus } from './service/healthCheck/cardanoNode'
import healthCheck from './controller/health'
import blockfrostHealth from './controller/blockfrostHealth'
import express from 'express'
import { WebSocket } from 'ws'
import { initKafkaConsumers } from './service/Listeners/KafkaMessageConsumer'
Expand All @@ -7,10 +18,6 @@ import { ManagerWalletService } from './service/Manager/ManagerWallet'
import { TxListener } from './service/Listeners/TxListener'
import { parseRawBlockBody } from 'libcardano/cardano/ledger-serialization/transaction'
import environments from './config/environments'
import healthCheck from './controller/health'
import { cardanoNodeStatus } from './service/healthCheck/cardanoNode'
import blockfrostHealth from './controller/blockfrostHealth'

const app = express()
const port = environments.serverPort

Expand All @@ -32,17 +39,19 @@ const server = app.listen(port, async () => {

blockchain.start()
blockchain.blockChain.on('extendBlock', (block) => {
const transaction = startTransaction('extendBlock', 'node')
cardanoNodeStatus.onBlockTimeStamp(Date.now(), block)

console.log(
`[Blockchain] RollForward blockNo=${block.blockNo} hash=${block.headerHash.toString('hex')} slot=${block.slotNo}`
)
const transactions = parseRawBlockBody(block.body)
txListener.onBlock({ ...block, body: transactions })
manager.broadcast('extend_block', block)
transaction.end('success')
})
await initKafkaConsumers(manager)
})

server.on('error', (e) => {
console.error('Server error:', e)
process.exit(1)
Expand Down
4 changes: 1 addition & 3 deletions agent-manager/src/repository/agent_manager_repository.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { JsonValue } from '@prisma/client/runtime/library'
import { PrismaClient } from '@prisma/client'

const prisma = new PrismaClient()
import { prisma } from './dbClient'

export async function getAgentIdBySecret(agentSecret: Buffer): Promise<string | null> {
return prisma.agent
Expand Down
24 changes: 24 additions & 0 deletions agent-manager/src/repository/dbClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PrismaClient } from '@prisma/client'
import { apm } from '../config/tracer'

export const prisma = new PrismaClient()

if (process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY) {
prisma.$use(async (params: any, next: any) => {
const spanName = params.model ? `prisma.${params.model}.${params.action}` : `prisma.${params.action}`
const span = apm.startSpan(spanName)
if (span) {
span.type = 'DB'
span.subtype = 'prisma'
span.action = 'query'
}
try {
const result = await next(params)
span?.end()
return result
} catch (e) {
span?.end()
throw e
}
})
}
4 changes: 1 addition & 3 deletions agent-manager/src/repository/trigger_history_repository.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { PrismaClient } from '@prisma/client'
import { DateTime } from 'luxon'
import { v4 as uuidv4 } from 'uuid'

const prisma = new PrismaClient()
import { prisma } from './dbClient'

export type TriggerType = 'CRON' | 'MANUAL' | 'EVENT' | 'INTERNAL'

Expand Down
79 changes: 72 additions & 7 deletions agent-manager/src/service/Manager/AgentManagerRPC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { ManagerWalletService } from './ManagerWallet'
import { Server } from 'ws'
import { generateRootKey } from '../../utils/cardano'
import { RPCTopicHandler } from '../RPCTopicHandler'
import { apmConfig, captureError, startTransaction } from '../../config/tracer'

export interface ILog {
function_name: string
Expand All @@ -24,7 +25,7 @@ export interface ILog {
parameters?: string
result?: string
}

let _tx: any = undefined
export class AgentManagerRPC extends WsRpcServer {
managerWallet: ManagerWalletService
rpcTopicHandler: RPCTopicHandler
Expand All @@ -36,29 +37,74 @@ export class AgentManagerRPC extends WsRpcServer {
}

protected async validateConnection(req: IncomingMessage): Promise<string> {
const tx = (_tx = startTransaction('connect', 'request'))
const agentSecretKey = req.url?.slice(1)
console.log('new connection from', req.socket.remoteAddress)
tx.addLabels({
request_ip: req.socket.remoteAddress, // Request IP address (note: underscore instead of dot)
})

// Loop through all request headers and add them as labels
for (const [headerKey, headerValue] of Object.entries(req.headers)) {
// Replace dots in header keys with underscores
const labelKey = `request_headers_${headerKey.replace(/\./g, '_')}`

// Make sure the header value is a string and add as a label
if (typeof headerValue === 'string') {
tx.addLabels({ [labelKey]: headerValue })
} else {
// If the value is not a string (like an array), we can stringify it
tx.addLabels({ [labelKey]: JSON.stringify(headerValue) })
}
}

if (agentSecretKey) {
const exists = await getAgentIdBySecret(Buffer.from(agentSecretKey, 'base64'))
if (exists) {
return exists
} else {
tx.setOutcome('failure')
tx.end(`Agent with secret_key ${agentSecretKey} doesn't exist`)
throw Error(`Agent with secret_key ${agentSecretKey} doesn't exist`)
}
} else {
tx.setOutcome('failure')
tx.end(`Invalid websocket connection`)
throw Error('Invalid websocket connection')
}
}

protected async handleMethodCall(connection_id: string, method: string, args: any[]) {
console.log('Method call from client', connection_id, method, args)
return this.rpcTopicHandler.handleEvent(method, connection_id, args)
const tx = startTransaction('RPC ' + method, 'request')
const labels: Record<string, string> = {
connectionId: connection_id,
method: method,
}
args.map((x, index) => {
labels['arg' + index] = x
})
tx.addLabels(labels)
const _tx = tx as any
_tx.context = { connectionId: connection_id }

return this.rpcTopicHandler
.handleEvent(method, connection_id, args)
.then((v) => {
tx.setOutcome('success')
tx.addLabels({ result: v })
return v
})
.catch((e) => {
captureError(e)
throw e
})
.finally(() => tx.end())
}

protected async validateEventBroadcast(connection_id: string, topic: string, message: any): Promise<boolean> {
console.debug("message : ",message)
// TODO: handle the event emitted
console.log('Event from client', connection_id, topic, message)
if (topic === 'active_connection') {
await updateLastActiveTimestamp(connection_id)
}
Expand All @@ -67,6 +113,8 @@ export class AgentManagerRPC extends WsRpcServer {
}

protected onReady(client: RpcV1): void {
const thisTx = _tx
_tx = undefined
const agentConfigsPromise = fetchAgentConfiguration(client.getId())
.then(async (config) => {
if (!config) {
Expand All @@ -77,15 +125,32 @@ export class AgentManagerRPC extends WsRpcServer {
const { instanceCount, agentIndex, agentName } = config
const rootKeyBuffer = await generateRootKey(agentIndex || 0)
client.emit('instance_count', { instanceCount, rootKeyBuffer, agentName })
if (apmConfig) {
;(config as any).apm = apmConfig
}

client.emit('initial_config', config)
})
.catch((error) => {
throw error
})
Promise.all([agentConfigsPromise]).catch((err: Error) => {
console.error('AgentManagerRPC onReady Error:', err)
this.disconnect(client.getId(), err.message)
})
Promise.all([agentConfigsPromise])
.catch((err: Error) => {
console.error('AgentManagerRPC onReady Error:', err)
this.disconnect(client.getId(), err.message)
})
.then(() => {
if (thisTx) {
thisTx.setOutcome('success')
}
})
.catch((e) => {
captureError(e)
})
.finally(() => {
if (thisTx) {
thisTx.end()
}
})
}
}
22 changes: 19 additions & 3 deletions agent-manager/src/service/MetadataService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,32 @@ import environments from '../config/environments'
import * as blake from 'blakejs'

class MetadataService {
constructor() {}
metadataSaveBaseUrl
metadataFetchBaseUrl
constructor() {
this.metadataSaveBaseUrl = environments.metaDataBaseURL
this.metadataFetchBaseUrl = environments.metaDataFetchBaseURL
}

async saveMetadata(content: string) {
const hash = blake.blake2bHex(content, undefined, 32)
const res = await fetch(`${environments.metaDataBaseURL}/data/${hash}`, {
const res = await fetch(`${this.metadataSaveBaseUrl}/data/${hash}`, {
method: 'PUT',
body: content,
})
if (res.ok) {
return { dataHash: hash, url: `${environments.metaDataBaseURL}/data/${hash}` }
return { dataHash: hash, url: `${this.metadataSaveBaseUrl}/data/${hash}` }
} else {
throw new Error((await res.text()) || (await res.json()))
}
}

async fetchMetadata(url: string, hash: string) {
const res = await fetch(`${this.metadataFetchBaseUrl}/api/metadata?url=${url}&hash=${hash}`, {
method: 'GET',
})
if (res.ok) {
return await res.json()
} else {
throw new Error((await res.text()) || (await res.json()))
}
Expand Down
10 changes: 8 additions & 2 deletions agent-manager/src/service/RPCTopicHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ export class RPCTopicHandler {
this.managerWallet = managerWallet
}

handleEvent(eventName: string, connection_id: string, args: any) {
handleEvent(eventName: string, connection_id: string, args: any): Promise<any> {
const handler = (this as any)[eventName]
if (handler === undefined || eventName === 'constructor') {
console.error('Unknown event type', eventName, 'received')
return Promise.resolve()
} else {
return handler.bind(this)(connection_id, args)
}
Expand All @@ -34,7 +35,7 @@ export class RPCTopicHandler {
logEvent(connection_id: string, args: any[]) {
const params: ILog = args[0]
const txHash = params.txHash ? params.txHash : ''
saveTriggerHistory(
return saveTriggerHistory(
connection_id,
params.function_name,
params.trigger,
Expand Down Expand Up @@ -80,4 +81,9 @@ export class RPCTopicHandler {
throw err
})
}

fetchMetadata(connection_id: string, args: any[]) {
const [url, hash] = args
return metaDataService.fetchMetadata(url, hash)
}
}
Loading

0 comments on commit ebe865c

Please sign in to comment.