Skip to content

Commit

Permalink
improved implementation, refactored added better error handling and m…
Browse files Browse the repository at this point in the history
…anually tested it on one node start command

Signed-off-by: instamenta <[email protected]>
  • Loading branch information
instamenta committed Sep 30, 2024
1 parent 263b6be commit 4c12d28
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 77 deletions.
66 changes: 41 additions & 25 deletions src/commands/node.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import {
LOCAL_HOST
} from '../core/constants.mjs'
import { NodeStatusCodes, NodeStatusEnums } from '../core/enumerations.mjs'
import { LeaseManager } from '../core/lease_manager.mjs'

/**
* Defines the core functionalities of 'node' command
Expand Down Expand Up @@ -1138,14 +1139,20 @@ export class NodeCommand extends BaseCommand {
async start (argv) {
const self = this

/** @type {function(): Promise<void>} */
let releaseLeaseCallback

const tasks = new Listr([
{
title: 'Initialize',
task: async (ctx, task) => {
self.configManager.update(argv)
await prompts.execute(task, self.configManager, [
flags.namespace,
flags.nodeIDs
flags.nodeIDs,

flags.clusterRoleUsername,
flags.clusterRolePassword
])

ctx.config = {
Expand All @@ -1164,6 +1171,10 @@ export class NodeCommand extends BaseCommand {
if (!await self.k8.hasNamespace(ctx.config.namespace)) {
throw new FullstackTestingError(`namespace ${ctx.config.namespace} does not exist`)
}

const { releaseLease } = await (new LeaseManager(this.logger, this.k8, this.configManager)).acquireLease()

releaseLeaseCallback = releaseLease
}
},
{
Expand Down Expand Up @@ -1199,30 +1210,31 @@ export class NodeCommand extends BaseCommand {
},
skip: (ctx, _) => self.configManager.getFlag(flags.app) !== '' && self.configManager.getFlag(flags.app) !== constants.HEDERA_APP_NAME
},
{
title: 'Add node stakes',
task: (ctx, task) => {
if (ctx.config.app === '' || ctx.config.app === constants.HEDERA_APP_NAME) {
const subTasks = []
const accountMap = getNodeAccountMap(ctx.config.nodeIds)
for (const nodeId of ctx.config.nodeIds) {
const accountId = accountMap.get(nodeId)
subTasks.push({
title: `Adding stake for node: ${chalk.yellow(nodeId)}`,
task: async () => await self.addStake(ctx.config.namespace, accountId, nodeId)
})
}

// set up the sub-tasks
return task.newListr(subTasks, {
concurrent: false,
rendererOptions: {
collapseSubtasks: false
}
})
}
}
}], {
// {
// title: 'Add node stakes',
// task: (ctx, task) => {
// if (ctx.config.app === '' || ctx.config.app === constants.HEDERA_APP_NAME) {
// const subTasks = []
// const accountMap = getNodeAccountMap(ctx.config.nodeIds)
// for (const nodeId of ctx.config.nodeIds) {
// const accountId = accountMap.get(nodeId)
// subTasks.push({
// title: `Adding stake for node: ${chalk.yellow(nodeId)}`,
// task: async () => await self.addStake(ctx.config.namespace, accountId, nodeId)
// })
// }
//
// // set up the sub-tasks
// return task.newListr(subTasks, {
// concurrent: false,
// rendererOptions: {
// collapseSubtasks: false
// }
// })
// }
// }
// }
], {
concurrent: false,
rendererOptions: constants.LISTR_DEFAULT_RENDERER_OPTION
})
Expand All @@ -1234,6 +1246,10 @@ export class NodeCommand extends BaseCommand {
throw new FullstackTestingError(`Error starting node: ${e.message}`, e)
} finally {
await self.close()

if (typeof releaseLeaseCallback === 'function') {
await releaseLeaseCallback()
}
}

return true
Expand Down
2 changes: 0 additions & 2 deletions src/commands/roles.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ export class RolesCommand extends BaseCommand {
{
title: 'Retrieve User Credentials',
task: async (ctx) => {
console.log(`${ctx.config.clusterRoleUsername}-credentials`)

const secret = await this.k8.getSecret(ctx.config.namespace, `${ctx.config.clusterRoleUsername}-credentials`)

if (!secret || !secret.data) {
Expand Down
147 changes: 97 additions & 50 deletions src/core/k8.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,21 +1132,20 @@ export class K8 {
return resp.response.statusCode === 200.0
}

// --------------------------------------- ROLES --------------------------------------- //
/**
* @param {string} name
* @returns {Promise<k8s.V1ClusterRole|null>}
*/
async getClusterRole (name) {
return this.rbacApiClient.listClusterRole(
undefined,
false,
undefined,
`metadata.name=${name}`
const { response, body } = await this.rbacApiClient.listClusterRole(
null, false, null, `metadata.name=${name}`
)
.then((resp) => resp?.body?.items?.length ? resp.body.items[0] : null)
.catch(e => {
throw new FullstackTestingError(`Error fetching ClusterRole ${name}: ${e.message}`, e)
})
.catch(e => e) // When error occurs the body becomes the error

this._handleKubernetesClientError(response, body, 'Failed to create cluster role')

return body?.items?.[0] ?? null
}

/**
Expand All @@ -1165,14 +1164,15 @@ export class K8 {
}]

/** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRole}} */
const { response, body } = await this.rbacApiClient.createClusterRole(clusterRoleBody).catch(e => e)
const {
response,
body
} = await this.rbacApiClient.createClusterRole(clusterRoleBody)
.catch(e => e) // When error occurs the body becomes the error

if (response.statusCode !== 201) {
this.logger.error('Failed to create cluster role', body)
this._handleKubernetesClientError(response, body, 'Failed to create cluster role')

throw new FullstackTestingError('Failed to create cluster role' +
`role name: ${roleName}, status code: ${response.statusCode}`)
}
return body
}

/**
Expand All @@ -1199,15 +1199,16 @@ export class K8 {
apiGroup: 'rbac.authorization.k8s.io'
}

/** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRole}} */
const { response, body } = await this.rbacApiClient.createClusterRoleBinding(clusterRoleBinding).catch(e => e)
/** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRoleBinding}} */
const {
response,
body
} = await this.rbacApiClient.createClusterRoleBinding(clusterRoleBinding)
.catch(e => e) // When error occurs the body becomes the error

if (response.statusCode !== 201) {
this.logger.error('Failed to create cluster role binding', body)
this._handleKubernetesClientError(response, body, 'Failed to create cluster role binding')

throw new FullstackTestingError('Failed to create cluster role binding' +
`status code: ${response.statusCode}, role name: ${roleName}, username: ${username}`)
}
return body
}

/**
Expand All @@ -1216,62 +1217,108 @@ export class K8 {
*/
async deleteClusterRoleBinding (name) {
/** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRole}} */
const { response, body } = await this.rbacApiClient.deleteClusterRoleBinding(name).catch(e => e)
const {
response,
body
} = await this.rbacApiClient.deleteClusterRoleBinding(name)
.catch(e => e) // When error occurs the body becomes the error

if (response.statusCode !== 200) {
this.logger.error('Failed to delete cluster role binding', body)
this._handleKubernetesClientError(response, body, 'Failed to delete cluster role binding')

throw new FullstackTestingError('Failed to delete cluster role binding' +
`status code: ${response.statusCode}, name: ${name}`)
}
return body
}

// --------------------------------------- LEASES --------------------------------------- //
/**
* @param {string} namespace
* @param {k8s.V1Lease} body
* @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>}
* @param {string} leaseName
* @param {string} holderName
* @returns {Promise<k8s.V1Lease>}
*/
createNamespacedLease (namespace, body) {
return this.coordinationApiClient.createNamespacedLease(namespace, body)
async createNamespacedLease (namespace, leaseName, holderName) {
const lease = new k8s.V1Lease()
lease.apiVersion = 'coordination.k8s.io/v1'
lease.kind = 'Lease'
lease.metadata = {
name: leaseName,
namespace
}
lease.spec = {
holderIdentity: holderName,
leaseDurationSeconds: 15,
acquireTime: new k8s.V1MicroTime()
}

/** @type {{response: http.IncomingMessage, body: k8s.V1Lease}} */
const { response, body } = await this.coordinationApiClient.createNamespacedLease(namespace, lease)
.catch(e => e) // When error occurs the body becomes the error

this._handleKubernetesClientError(response, body, 'Failed to create namespaced lease')

return body
}

/**
* @param {string} name
* @param {string} leaseName
* @param {string} namespace
* @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>}
* @returns {k8s.V1Lease}
*/
readNamespacedLease (name, namespace) {
return this.coordinationApiClient.readNamespacedLease(name, namespace)
async readNamespacedLease (leaseName, namespace) {
/** @type {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} */
const { response, body } = await this.coordinationApiClient.readNamespacedLease(leaseName, namespace)
.catch(e => e) // When error occurs the body becomes the error

this._handleKubernetesClientError(response, body, 'Failed to read namespaced lease')

return body
}

/**
* @param {string} name
* @param {string} leaseName
* @param {string} namespace
* @param {k8s.V1Lease} body
* @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>}
* @param {k8s.V1Lease} lease - lease k8s class ( must be in sync with the current one )
* @returns {Promise<k8s.V1Lease>} - the lease after the replacement
*/
replaceNamespacedLease (name, namespace, body) {
return this.coordinationApiClient.replaceNamespacedLease(name, namespace, body)
async renewNamespaceLease (leaseName, namespace, lease) {
lease.spec.renewTime = new k8s.V1MicroTime()

const { response, body } = await this.coordinationApiClient.replaceNamespacedLease(leaseName, namespace, lease)
.catch(e => e) // When error occurs the body becomes the error

this._handleKubernetesClientError(response, body, 'Failed to renew namespaced lease')

return body
}

/**
* @param {string} name
* @param {string} namespace
* @param {string} [fieldSelector]
* @param {string} [labelSelector]
* @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>}
*/
listNamespacedLease (namespace, fieldSelector, labelSelector) {
return this.coordinationApiClient.listNamespacedLease(namespace, undefined, undefined, undefined, fieldSelector, labelSelector)
async deleteNamespacedLease (name, namespace) {
const { response, body } = await this.coordinationApiClient.deleteNamespacedLease(name, namespace)
.catch(e => e) // When error occurs the body becomes the error

this._handleKubernetesClientError(response, body, 'Failed to delete namespaced lease')

return body
}

/**
* @param {string} name
* @param {string} namespace
* @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>}
* @param {http.IncomingMessage|*} response
* @param {Error|*} error
* @param {string} errorMessage
* @private
* @throws
*/
deleteNamespacedLease (name, namespace) {
return this.coordinationApiClient.deleteNamespacedLease(name, namespace)
_handleKubernetesClientError (response, error, errorMessage) {
if (response.statusCode > 202) {
errorMessage += `, statusCode: ${response.statusCode}`

this.logger.error(errorMessage, error)

throw new FullstackTestingError(errorMessage, errorMessage)
}
}

/**
Expand Down
Loading

0 comments on commit 4c12d28

Please sign in to comment.