From 4c12d28cd602eaca2da2905519d93e43f45820c9 Mon Sep 17 00:00:00 2001 From: instamenta Date: Mon, 30 Sep 2024 10:59:44 +0300 Subject: [PATCH] improved implementation, refactored added better error handling and manually tested it on one node start command Signed-off-by: instamenta --- src/commands/node.mjs | 66 ++++++++++------- src/commands/roles.mjs | 2 - src/core/k8.mjs | 147 ++++++++++++++++++++++++------------- src/core/lease_manager.mjs | 100 +++++++++++++++++++++++++ 4 files changed, 238 insertions(+), 77 deletions(-) create mode 100644 src/core/lease_manager.mjs diff --git a/src/commands/node.mjs b/src/commands/node.mjs index 4216c8716..423c57d14 100644 --- a/src/commands/node.mjs +++ b/src/commands/node.mjs @@ -59,6 +59,7 @@ import { LOCAL_HOST } from '../core/constants.mjs' import { NodeStatusCodes, NodeStatusEnums } from '../core/enumerations.mjs' +import { LeaseManager } from '../core/lease_manager.mjs' /** * Defines the core functionalities of 'node' command @@ -1138,6 +1139,9 @@ export class NodeCommand extends BaseCommand { async start (argv) { const self = this + /** @type {function(): Promise} */ + let releaseLeaseCallback + const tasks = new Listr([ { title: 'Initialize', @@ -1145,7 +1149,10 @@ export class NodeCommand extends BaseCommand { self.configManager.update(argv) await prompts.execute(task, self.configManager, [ flags.namespace, - flags.nodeIDs + flags.nodeIDs, + + flags.clusterRoleUsername, + flags.clusterRolePassword ]) ctx.config = { @@ -1164,6 +1171,10 @@ export class NodeCommand extends BaseCommand { if (!await self.k8.hasNamespace(ctx.config.namespace)) { throw new FullstackTestingError(`namespace ${ctx.config.namespace} does not exist`) } + + const { releaseLease } = await (new LeaseManager(this.logger, this.k8, this.configManager)).acquireLease() + + releaseLeaseCallback = releaseLease } }, { @@ -1199,30 +1210,31 @@ export class NodeCommand extends BaseCommand { }, skip: (ctx, _) => self.configManager.getFlag(flags.app) !== '' && self.configManager.getFlag(flags.app) !== constants.HEDERA_APP_NAME }, - { - title: 'Add node stakes', - task: (ctx, task) => { - if (ctx.config.app === '' || ctx.config.app === constants.HEDERA_APP_NAME) { - const subTasks = [] - const accountMap = getNodeAccountMap(ctx.config.nodeIds) - for (const nodeId of ctx.config.nodeIds) { - const accountId = accountMap.get(nodeId) - subTasks.push({ - title: `Adding stake for node: ${chalk.yellow(nodeId)}`, - task: async () => await self.addStake(ctx.config.namespace, accountId, nodeId) - }) - } - - // set up the sub-tasks - return task.newListr(subTasks, { - concurrent: false, - rendererOptions: { - collapseSubtasks: false - } - }) - } - } - }], { + // { + // title: 'Add node stakes', + // task: (ctx, task) => { + // if (ctx.config.app === '' || ctx.config.app === constants.HEDERA_APP_NAME) { + // const subTasks = [] + // const accountMap = getNodeAccountMap(ctx.config.nodeIds) + // for (const nodeId of ctx.config.nodeIds) { + // const accountId = accountMap.get(nodeId) + // subTasks.push({ + // title: `Adding stake for node: ${chalk.yellow(nodeId)}`, + // task: async () => await self.addStake(ctx.config.namespace, accountId, nodeId) + // }) + // } + // + // // set up the sub-tasks + // return task.newListr(subTasks, { + // concurrent: false, + // rendererOptions: { + // collapseSubtasks: false + // } + // }) + // } + // } + // } + ], { concurrent: false, rendererOptions: constants.LISTR_DEFAULT_RENDERER_OPTION }) @@ -1234,6 +1246,10 @@ export class NodeCommand extends BaseCommand { throw new FullstackTestingError(`Error starting node: ${e.message}`, e) } finally { await self.close() + + if (typeof releaseLeaseCallback === 'function') { + await releaseLeaseCallback() + } } return true diff --git a/src/commands/roles.mjs b/src/commands/roles.mjs index 9224703bf..a591394cd 100644 --- a/src/commands/roles.mjs +++ b/src/commands/roles.mjs @@ -146,8 +146,6 @@ export class RolesCommand extends BaseCommand { { title: 'Retrieve User Credentials', task: async (ctx) => { - console.log(`${ctx.config.clusterRoleUsername}-credentials`) - const secret = await this.k8.getSecret(ctx.config.namespace, `${ctx.config.clusterRoleUsername}-credentials`) if (!secret || !secret.data) { diff --git a/src/core/k8.mjs b/src/core/k8.mjs index 001a8ffc6..f44ed80a2 100644 --- a/src/core/k8.mjs +++ b/src/core/k8.mjs @@ -1132,21 +1132,20 @@ export class K8 { return resp.response.statusCode === 200.0 } + // --------------------------------------- ROLES --------------------------------------- // /** * @param {string} name * @returns {Promise} */ async getClusterRole (name) { - return this.rbacApiClient.listClusterRole( - undefined, - false, - undefined, - `metadata.name=${name}` + const { response, body } = await this.rbacApiClient.listClusterRole( + null, false, null, `metadata.name=${name}` ) - .then((resp) => resp?.body?.items?.length ? resp.body.items[0] : null) - .catch(e => { - throw new FullstackTestingError(`Error fetching ClusterRole ${name}: ${e.message}`, e) - }) + .catch(e => e) // When error occurs the body becomes the error + + this._handleKubernetesClientError(response, body, 'Failed to create cluster role') + + return body?.items?.[0] ?? null } /** @@ -1165,14 +1164,15 @@ export class K8 { }] /** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRole}} */ - const { response, body } = await this.rbacApiClient.createClusterRole(clusterRoleBody).catch(e => e) + const { + response, + body + } = await this.rbacApiClient.createClusterRole(clusterRoleBody) + .catch(e => e) // When error occurs the body becomes the error - if (response.statusCode !== 201) { - this.logger.error('Failed to create cluster role', body) + this._handleKubernetesClientError(response, body, 'Failed to create cluster role') - throw new FullstackTestingError('Failed to create cluster role' + - `role name: ${roleName}, status code: ${response.statusCode}`) - } + return body } /** @@ -1199,15 +1199,16 @@ export class K8 { apiGroup: 'rbac.authorization.k8s.io' } - /** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRole}} */ - const { response, body } = await this.rbacApiClient.createClusterRoleBinding(clusterRoleBinding).catch(e => e) + /** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRoleBinding}} */ + const { + response, + body + } = await this.rbacApiClient.createClusterRoleBinding(clusterRoleBinding) + .catch(e => e) // When error occurs the body becomes the error - if (response.statusCode !== 201) { - this.logger.error('Failed to create cluster role binding', body) + this._handleKubernetesClientError(response, body, 'Failed to create cluster role binding') - throw new FullstackTestingError('Failed to create cluster role binding' + - `status code: ${response.statusCode}, role name: ${roleName}, username: ${username}`) - } + return body } /** @@ -1216,62 +1217,108 @@ export class K8 { */ async deleteClusterRoleBinding (name) { /** @type {{response: http.IncomingMessage, body: k8s.V1ClusterRole}} */ - const { response, body } = await this.rbacApiClient.deleteClusterRoleBinding(name).catch(e => e) + const { + response, + body + } = await this.rbacApiClient.deleteClusterRoleBinding(name) + .catch(e => e) // When error occurs the body becomes the error - if (response.statusCode !== 200) { - this.logger.error('Failed to delete cluster role binding', body) + this._handleKubernetesClientError(response, body, 'Failed to delete cluster role binding') - throw new FullstackTestingError('Failed to delete cluster role binding' + - `status code: ${response.statusCode}, name: ${name}`) - } + return body } // --------------------------------------- LEASES --------------------------------------- // /** * @param {string} namespace - * @param {k8s.V1Lease} body - * @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} + * @param {string} leaseName + * @param {string} holderName + * @returns {Promise} */ - createNamespacedLease (namespace, body) { - return this.coordinationApiClient.createNamespacedLease(namespace, body) + async createNamespacedLease (namespace, leaseName, holderName) { + const lease = new k8s.V1Lease() + lease.apiVersion = 'coordination.k8s.io/v1' + lease.kind = 'Lease' + lease.metadata = { + name: leaseName, + namespace + } + lease.spec = { + holderIdentity: holderName, + leaseDurationSeconds: 15, + acquireTime: new k8s.V1MicroTime() + } + + /** @type {{response: http.IncomingMessage, body: k8s.V1Lease}} */ + const { response, body } = await this.coordinationApiClient.createNamespacedLease(namespace, lease) + .catch(e => e) // When error occurs the body becomes the error + + this._handleKubernetesClientError(response, body, 'Failed to create namespaced lease') + + return body } /** - * @param {string} name + * @param {string} leaseName * @param {string} namespace - * @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} + * @returns {k8s.V1Lease} */ - readNamespacedLease (name, namespace) { - return this.coordinationApiClient.readNamespacedLease(name, namespace) + async readNamespacedLease (leaseName, namespace) { + /** @type {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} */ + const { response, body } = await this.coordinationApiClient.readNamespacedLease(leaseName, namespace) + .catch(e => e) // When error occurs the body becomes the error + + this._handleKubernetesClientError(response, body, 'Failed to read namespaced lease') + + return body } /** - * @param {string} name + * @param {string} leaseName * @param {string} namespace - * @param {k8s.V1Lease} body - * @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} + * @param {k8s.V1Lease} lease - lease k8s class ( must be in sync with the current one ) + * @returns {Promise} - the lease after the replacement */ - replaceNamespacedLease (name, namespace, body) { - return this.coordinationApiClient.replaceNamespacedLease(name, namespace, body) + async renewNamespaceLease (leaseName, namespace, lease) { + lease.spec.renewTime = new k8s.V1MicroTime() + + const { response, body } = await this.coordinationApiClient.replaceNamespacedLease(leaseName, namespace, lease) + .catch(e => e) // When error occurs the body becomes the error + + this._handleKubernetesClientError(response, body, 'Failed to renew namespaced lease') + + return body } /** + * @param {string} name * @param {string} namespace - * @param {string} [fieldSelector] - * @param {string} [labelSelector] * @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} */ - listNamespacedLease (namespace, fieldSelector, labelSelector) { - return this.coordinationApiClient.listNamespacedLease(namespace, undefined, undefined, undefined, fieldSelector, labelSelector) + async deleteNamespacedLease (name, namespace) { + const { response, body } = await this.coordinationApiClient.deleteNamespacedLease(name, namespace) + .catch(e => e) // When error occurs the body becomes the error + + this._handleKubernetesClientError(response, body, 'Failed to delete namespaced lease') + + return body } /** - * @param {string} name - * @param {string} namespace - * @returns {Promise<{response: http.IncomingMessage, body: k8s.V1Lease}>} + * @param {http.IncomingMessage|*} response + * @param {Error|*} error + * @param {string} errorMessage + * @private + * @throws */ - deleteNamespacedLease (name, namespace) { - return this.coordinationApiClient.deleteNamespacedLease(name, namespace) + _handleKubernetesClientError (response, error, errorMessage) { + if (response.statusCode > 202) { + errorMessage += `, statusCode: ${response.statusCode}` + + this.logger.error(errorMessage, error) + + throw new FullstackTestingError(errorMessage, errorMessage) + } } /** diff --git a/src/core/lease_manager.mjs b/src/core/lease_manager.mjs new file mode 100644 index 000000000..017d01a5e --- /dev/null +++ b/src/core/lease_manager.mjs @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the ""License""); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an ""AS IS"" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +'use strict' +import { FullstackTestingError, MissingArgumentError } from './errors.mjs' +import { flags } from '../commands/index.mjs' + +/** + * Handles interacting with the kubernetes lease + */ +export class LeaseManager { + /** + * @param {Logger} logger + * @param {K8} k8 + * @param {ConfigManager} configManager + */ + constructor (logger, k8, configManager) { + if (!k8) throw new MissingArgumentError('an instance of core/K8 is required') + if (!logger) throw new MissingArgumentError('an instance of core/Logger is required') + if (!configManager) throw new MissingArgumentError('an instance of core/ConfigManager is required') + + this.k8 = k8 + this.logger = logger + this.configManager = configManager + } + + /** + * @returns {Promise<{releaseLease: (function: Promise)}>} + */ + async acquireLease () { + const namespace = this._getNamespace() + const username = this._getUsername() + const leaseName = `${username}-lease` + + try { + await this.k8.readNamespacedLease(leaseName, namespace) + } catch (error) { + if (!(error.message.includes('404'))) { + throw new FullstackTestingError(`Failed to acquire lease: ${error.message}`) + } + + await this.k8.createNamespacedLease(namespace, leaseName, username) + } + + /** @returns {Promise} */ + const renewLease = async () => { + try { + const lease = await this.k8.readNamespacedLease(leaseName, namespace) + await this.k8.renewNamespaceLease(leaseName, namespace, lease) + } catch (error) { + throw new FullstackTestingError(`Failed to renew lease: ${error.message}`, error) + } + } + + const intervalId = setInterval(renewLease, 10_000) + + /** @returns {Promise} */ + const releaseLease = async () => { + clearInterval(intervalId) + this.k8.deleteNamespacedLease(leaseName, namespace) + .then(() => this.logger.info(`Lease released by ${username}`)) + .catch(e => this.logger.error(`Failed to release lease: ${e.message}`)) + } + + return { releaseLease } + } + + /** + * @returns {string} + * @private + */ + _getNamespace () { + const ns = this.configManager.getFlag(flags.namespace) + if (!ns) throw new MissingArgumentError('namespace is not set') + return ns + } + + /** + * @returns {string} + * @private + */ + _getUsername () { + const username = this.configManager.getFlag(flags.clusterRoleUsername) + if (!username) throw new MissingArgumentError('clusterRoleUsername is not set') + return username + } +}