Skip to content

Commit

Permalink
Merge pull request #251 from cardanoapi/enhancement/event-based-form-…
Browse files Browse the repository at this point in the history
…integration

Enhancement/event based form integration
  • Loading branch information
Sital999 authored Jan 6, 2025
2 parents b72b406 + 0fde493 commit 260cc08
Show file tree
Hide file tree
Showing 259 changed files with 6,149 additions and 5,148 deletions.
2 changes: 1 addition & 1 deletion agent-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"express": "^4.19.2",
"kafkajs": "^2.2.4",
"kuber-client": "^3.0.3",
"libcardano": "1.4.3",
"libcardano": "1.4.11",
"luxon": "^3.4.4",
"prisma": "^5.13.0",
"swagger-jsdoc": "^6.2.8",
Expand Down
37 changes: 19 additions & 18 deletions agent-manager/src/config/tracer.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import { ParameterizedMessageObject, Span, Transaction } from "elastic-apm-node";
import { ParameterizedMessageObject, Span, Transaction } from 'elastic-apm-node'

export const apm = require('elastic-apm-node')
import apmAgent from 'elastic-apm-node'

export const apm = apmAgent

function readConfig(){
if(process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY && process.env.ELASTIC_APM_ENVIRONMENT){
return {
ELASTIC_APM_SERVER_URL: process.env.ELASTIC_APM_SERVER_URL,
ELASTIC_APM_API_KEY: process.env.ELASTIC_APM_API_KEY,
ELASTIC_APM_ENVIRONMENT: process.env.ELASTIC_APM_ENVIRONMENT
}
}
function readConfig() {
if (process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY && process.env.ELASTIC_APM_ENVIRONMENT) {
return {
ELASTIC_APM_SERVER_URL: process.env.ELASTIC_APM_SERVER_URL,
ELASTIC_APM_API_KEY: process.env.ELASTIC_APM_API_KEY,
ELASTIC_APM_ENVIRONMENT: process.env.ELASTIC_APM_ENVIRONMENT,
}
}
}
export const apmConfig=readConfig()
export const apmConfig = readConfig()

export const startTransaction= (...args: any[]): Transaction=>{
return apm.startTransaction(...args);
export const startTransaction = (...args: any[]): Transaction => {
return apm.startTransaction(...args)
}
export const captureError= (err: Error | string | ParameterizedMessageObject)=>{
return apm.captureError(err)
export const captureError = (err: Error | string | ParameterizedMessageObject) => {
return apm.captureError(err)
}
export const startSpan = (...args: any[]): Span | any => {
return apm.startSpan(...args)
}
export const startSpan = (...args: any[]) : Span | any =>{
return apm.startSpan(...args);
}
20 changes: 20 additions & 0 deletions agent-manager/src/controller/blockfrostHealth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Router, Request, Response } from 'express'
import { handlerWrapper } from '../utils/asyncWrapper'
import environments from '../config/environments'

const router = Router()

async function blockfrostHealthCheck(req: Request, res: Response) {
const url = `https://cardano-${environments.networkName}.blockfrost.io/api/v0/health`
const response = await fetch(url, {
method: 'GET',
headers: {
project_id: environments.blockFrostApiKey,
},
})
return res.status(response.status).send(await response.json())
}

router.get('/', handlerWrapper(blockfrostHealthCheck))

export default router
32 changes: 32 additions & 0 deletions agent-manager/src/controller/health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Router, Request, Response } from 'express'
import { handlerWrapper } from '../utils/asyncWrapper'
import { checkKafkaStatus } from '../service/healthCheck/kafka'
import { cardanoNodeStatus } from '../service/healthCheck/cardanoNode'

const router = Router()

async function healthCheck(req: Request, res: Response) {
try {
const isKafkaHealthy = await checkKafkaStatus()
const nodeStatus = cardanoNodeStatus.checkStatus()
return res.status(isKafkaHealthy && nodeStatus.isHealthy ? 200 : 503).send({
isHealthy: isKafkaHealthy && nodeStatus.isHealthy,
details: {
kafka: {
isHealthy: isKafkaHealthy,
},
cardanoNode: {
isHealthy: nodeStatus.isHealthy,
secsSinceLastBlock: cardanoNodeStatus.lastTimeStamp,
lastBlock: nodeStatus.block,
},
},
})
} catch (err: any) {
return res.status(500).send(err.message ? err.message : err)
}
}

router.get('/', handlerWrapper(healthCheck))

export default router
34 changes: 16 additions & 18 deletions agent-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
process.env.ELASTIC_APM_LOG_LEVEL='warning'
process.env.ELASTIC_APM_SERVICE_NAME='autonomous-agents-manager'
process.env.ELASTIC_APM_ENVIRONMENT=process.env.ELASTIC_APM_ENVIRONMENT || 'local'
process.env.ELASTIC_APM_LOG_LEVEL = 'warning'
process.env.ELASTIC_APM_SERVICE_NAME = 'autonomous-agents-manager'
process.env.ELASTIC_APM_ENVIRONMENT = process.env.ELASTIC_APM_ENVIRONMENT || 'local'
import * as dotenv from 'dotenv'
dotenv.config()
import * as agent from 'elastic-apm-node/start'; agent;
import { startTransaction } from "./config/tracer";
// agent.start({
// serviceName: 'autonomous-agents-manager',
// environment: 'local',
// // Use if APM Server requires a token
//
// // Use if APM Server uses API keys for authentication
// apiKey: process.env.ELASTIC_APM_API_KEY,
//
// // Set custom APM Server URL (default: http://127.0.0.1:8200)
// serverUrl: process.env.ELASTIC_APM_SERVER_URL,
// })
// import * as agent from 'elastic-apm-node/start'
// agent
import { startTransaction } from './config/tracer'
import { cardanoNodeStatus } from './service/healthCheck/cardanoNode'
import healthCheck from './controller/health'
import blockfrostHealth from './controller/blockfrostHealth'
import express from 'express'
import { WebSocket } from 'ws'
import { initKafkaConsumers } from './service/Listeners/KafkaMessageConsumer'
Expand All @@ -24,10 +17,13 @@ import { createBlockchainInstance } from './service/Listeners/BlockchainService'
import { ManagerWalletService } from './service/Manager/ManagerWallet'
import { TxListener } from './service/Listeners/TxListener'
import { parseRawBlockBody } from 'libcardano/cardano/ledger-serialization/transaction'
import environments from "./config/environments";
import environments from './config/environments'
const app = express()
const port = environments.serverPort

app.use('/health', healthCheck)
app.use('/blockfrost/health', blockfrostHealth)

const server = app.listen(port, async () => {
console.log(`Server is running on http://localhost:${port}`)

Expand All @@ -43,7 +39,9 @@ const server = app.listen(port, async () => {

blockchain.start()
blockchain.blockChain.on('extendBlock', (block) => {
const transaction = startTransaction('extendBlock','node',)
const transaction = startTransaction('extendBlock', 'node')
cardanoNodeStatus.onBlockTimeStamp(Date.now(), block)

console.log(
`[Blockchain] RollForward blockNo=${block.blockNo} hash=${block.headerHash.toString('hex')} slot=${block.slotNo}`
)
Expand Down
3 changes: 1 addition & 2 deletions agent-manager/src/repository/agent_manager_repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { JsonValue } from '@prisma/client/runtime/library'
import { prisma } from "./dbClient";

import { prisma } from './dbClient'

export async function getAgentIdBySecret(agentSecret: Buffer): Promise<string | null> {
return prisma.agent
Expand Down
42 changes: 21 additions & 21 deletions agent-manager/src/repository/dbClient.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { PrismaClient } from "@prisma/client";
import { apm } from "../config/tracer";
import { PrismaClient } from '@prisma/client'
import { apm } from '../config/tracer'

export const prisma = new PrismaClient()

if(process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY){
prisma.$use(async (params: any, next:any) => {
const spanName=params.model?`prisma.${params.model}.${params.action}`:`prisma.${params.action}`
const span = apm.startSpan(spanName);
if (span) {
span.type = "DB";
span.subtype = "prisma";
span.action = "query";
}
try {
const result = await next(params);
span?.end();
return result;
} catch (e) {
span?.end();
throw e;
}
});
}
if (process.env.ELASTIC_APM_SERVER_URL && process.env.ELASTIC_APM_API_KEY) {
prisma.$use(async (params: any, next: any) => {
const spanName = params.model ? `prisma.${params.model}.${params.action}` : `prisma.${params.action}`
const span = apm.startSpan(spanName)
if (span) {
span.type = 'DB'
span.subtype = 'prisma'
span.action = 'query'
}
try {
const result = await next(params)
span?.end()
return result
} catch (e) {
span?.end()
throw e
}
})
}
4 changes: 1 addition & 3 deletions agent-manager/src/repository/trigger_history_repository.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { PrismaClient } from '@prisma/client'
import { DateTime } from 'luxon'
import { v4 as uuidv4 } from 'uuid'
import { prisma } from "./dbClient";

import { prisma } from './dbClient'

export type TriggerType = 'CRON' | 'MANUAL' | 'EVENT' | 'INTERNAL'

Expand Down
13 changes: 11 additions & 2 deletions agent-manager/src/service/Listeners/KafkaMessageConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import environments from '../../config/environments'
const config = environments.kafka
const configTopic = `${config.topicPrefix || config.prefix || 'agent'}-updates`
const triggerTopic = `${config.topicPrefix || config.prefix || 'agent'}-triggers`
const topicList = [configTopic, triggerTopic]
export const topicList = [configTopic, triggerTopic]
const brokers = config.brokers
.split(',')
.map((x) => x.trim())
Expand All @@ -19,7 +19,9 @@ const kafka = new Kafka({
brokers, // Update with your Kafka broker address
})

const consumer = kafka.consumer({ groupId })
export const consumer = kafka.consumer({
groupId,
})

export async function initKafkaConsumers(manager: AgentManagerRPC) {
console.log('[Kafka]', `brokers:${brokers}, groupId:${groupId}, topics:${topicList}`)
Expand Down Expand Up @@ -74,3 +76,10 @@ export async function initKafkaConsumers(manager: AgentManagerRPC) {
},
})
}

const { HEARTBEAT } = consumer.events
let lastHeartbeat: number = 0
consumer.on(HEARTBEAT, ({ timestamp }) => {
lastHeartbeat = timestamp
})
export const fetchConsumerLatestHeartbeat = () => lastHeartbeat
Loading

0 comments on commit 260cc08

Please sign in to comment.