From b7d329c7d1e5322589b6a5ae731606143274c3ae Mon Sep 17 00:00:00 2001 From: Oleksandr Dubenko Date: Mon, 22 Jul 2024 11:33:11 +0200 Subject: [PATCH] frontend: Refactor apiProxy, move PortForward type inside api folder This commit refactors apiProxy file into separate multiple files, also improving types for them. PortForward interface which was called PortForwardState was moved from react component into portForward.ts, next to the api functions for port forwarding logic Signed-off-by: Oleksandr Dubenko --- .../common/Resource/PortForward.tsx | 9 +- frontend/src/components/cronjob/Details.tsx | 2 +- .../src/lib/k8s/apiProxy/apiProxy.test.ts | 2 +- frontend/src/lib/k8s/apiProxy/apply.ts | 63 + frontend/src/lib/k8s/apiProxy/clusterApi.ts | 177 ++ .../src/lib/k8s/apiProxy/clusterRequests.ts | 284 +++ frontend/src/lib/k8s/apiProxy/constants.ts | 7 + frontend/src/lib/k8s/apiProxy/drainNode.ts | 77 + frontend/src/lib/k8s/apiProxy/factories.ts | 536 +++++ frontend/src/lib/k8s/apiProxy/formatUrl.ts | 54 + frontend/src/lib/k8s/apiProxy/index.ts | 2048 +---------------- frontend/src/lib/k8s/apiProxy/metricsApi.ts | 48 + frontend/src/lib/k8s/apiProxy/pluginsApi.ts | 32 + frontend/src/lib/k8s/apiProxy/portForward.ts | 133 ++ .../src/lib/k8s/apiProxy/queryParameters.ts | 111 + frontend/src/lib/k8s/apiProxy/scaleApi.ts | 54 + frontend/src/lib/k8s/apiProxy/streamingApi.ts | 480 ++++ frontend/src/lib/k8s/apiProxy/tokenApi.ts | 107 + frontend/src/lib/k8s/cluster.ts | 4 +- frontend/src/lib/k8s/clusterRole.ts | 9 +- frontend/src/lib/k8s/clusterRoleBinding.ts | 13 +- 21 files changed, 2241 insertions(+), 2009 deletions(-) create mode 100644 frontend/src/lib/k8s/apiProxy/apply.ts create mode 100644 frontend/src/lib/k8s/apiProxy/clusterApi.ts create mode 100644 frontend/src/lib/k8s/apiProxy/clusterRequests.ts create mode 100644 frontend/src/lib/k8s/apiProxy/constants.ts create mode 100644 frontend/src/lib/k8s/apiProxy/drainNode.ts create mode 100644 frontend/src/lib/k8s/apiProxy/factories.ts create mode 100644 frontend/src/lib/k8s/apiProxy/formatUrl.ts create mode 100644 frontend/src/lib/k8s/apiProxy/metricsApi.ts create mode 100644 frontend/src/lib/k8s/apiProxy/pluginsApi.ts create mode 100644 frontend/src/lib/k8s/apiProxy/portForward.ts create mode 100644 frontend/src/lib/k8s/apiProxy/queryParameters.ts create mode 100644 frontend/src/lib/k8s/apiProxy/scaleApi.ts create mode 100644 frontend/src/lib/k8s/apiProxy/streamingApi.ts create mode 100644 frontend/src/lib/k8s/apiProxy/tokenApi.ts diff --git a/frontend/src/components/common/Resource/PortForward.tsx b/frontend/src/components/common/Resource/PortForward.tsx index 9e1213ea87..33b554ca80 100644 --- a/frontend/src/components/common/Resource/PortForward.tsx +++ b/frontend/src/components/common/Resource/PortForward.tsx @@ -11,6 +11,7 @@ import { startPortForward, stopOrDeletePortForward, } from '../../../lib/k8s/apiProxy'; +import { PortForward as PortForwardState } from '../../../lib/k8s/apiProxy/portForward'; import { KubeContainer, KubeObject } from '../../../lib/k8s/cluster'; import Pod from '../../../lib/k8s/pod'; import Service from '../../../lib/k8s/service'; @@ -22,14 +23,6 @@ interface PortForwardProps { resource?: KubeObject; } -export interface PortForwardState { - id: string; - namespace: string; - cluster: string; - port: string; - status: string; -} - export const PORT_FORWARDS_STORAGE_KEY = 'portforwards'; export const PORT_FORWARD_STOP_STATUS = 'Stopped'; export const PORT_FORWARD_RUNNING_STATUS = 'Running'; diff --git a/frontend/src/components/cronjob/Details.tsx b/frontend/src/components/cronjob/Details.tsx index 28eed54c2f..3d42c06c8b 100644 --- a/frontend/src/components/cronjob/Details.tsx +++ b/frontend/src/components/cronjob/Details.tsx @@ -27,7 +27,7 @@ import { getLastScheduleTime, getSchedule } from './List'; function SpawnJobDialog(props: { cronJob: CronJob; - applyFunc: (newItem: KubeObjectInterface) => Promise; + applyFunc: (newItem: KubeObjectInterface) => Promise; openJobDialog: boolean; setOpenJobDialog: (open: boolean) => void; }) { diff --git a/frontend/src/lib/k8s/apiProxy/apiProxy.test.ts b/frontend/src/lib/k8s/apiProxy/apiProxy.test.ts index cd6a67e559..ebc5ae6e2f 100644 --- a/frontend/src/lib/k8s/apiProxy/apiProxy.test.ts +++ b/frontend/src/lib/k8s/apiProxy/apiProxy.test.ts @@ -9,7 +9,7 @@ import WS from 'vitest-websocket-mock'; import exportFunctions from '../../../helpers'; import * as auth from '../../auth'; import * as cluster from '../../cluster'; -import * as apiProxy from '.'; +import * as apiProxy from '../apiProxy'; const baseApiUrl = exportFunctions.getAppUrl(); const wsUrl = baseApiUrl.replace('http', 'ws'); diff --git a/frontend/src/lib/k8s/apiProxy/apply.ts b/frontend/src/lib/k8s/apiProxy/apply.ts new file mode 100644 index 0000000000..f123d8de26 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/apply.ts @@ -0,0 +1,63 @@ +import _ from 'lodash'; +import { getCluster } from '../../cluster'; +import { KubeObjectInterface } from '../cluster'; +import { getClusterDefaultNamespace } from './clusterApi'; +import { ApiError } from './clusterRequests'; +import { resourceDefToApiFactory } from './factories'; + +/** + * Applies the provided body to the Kubernetes API. + * + * Tries to POST, and if there's a conflict it does a PUT to the api endpoint. + * + * @param body - The kubernetes object body to apply. + * @param clusterName - The cluster to apply the body to. By default uses the current cluster (URL defined). + * + * @returns The response from the kubernetes API server. + */ +export async function apply( + body: T, + clusterName?: string +): Promise { + const bodyToApply = _.cloneDeep(body); + + let apiEndpoint; + try { + apiEndpoint = await resourceDefToApiFactory(bodyToApply, clusterName); + } catch (err) { + console.error(`Error getting api endpoint when applying the resource ${bodyToApply}: ${err}`); + throw err; + } + + const cluster = clusterName || getCluster(); + + // Check if the default namespace is needed. And we need to do this before + // getting the apiEndpoint because it will affect the endpoint itself. + const isNamespaced = apiEndpoint.isNamespaced; + const { namespace } = body.metadata; + if (!namespace && isNamespaced) { + let defaultNamespace = 'default'; + + if (!!cluster) { + defaultNamespace = getClusterDefaultNamespace(cluster) || 'default'; + } + + bodyToApply.metadata.namespace = defaultNamespace; + } + + const resourceVersion = bodyToApply.metadata.resourceVersion; + + try { + delete bodyToApply.metadata.resourceVersion; + return await apiEndpoint.post(bodyToApply, {}, cluster!); + } catch (err) { + // Check to see if failed because the record already exists. + // If the failure isn't a 409 (i.e. Confilct), just rethrow. + if ((err as ApiError).status !== 409) throw err; + + // Preserve the resourceVersion if its an update request + bodyToApply.metadata.resourceVersion = resourceVersion; + // We had a conflict. Try a PUT + return apiEndpoint.put(bodyToApply, {}, cluster!) as Promise; + } +} diff --git a/frontend/src/lib/k8s/apiProxy/clusterApi.ts b/frontend/src/lib/k8s/apiProxy/clusterApi.ts new file mode 100644 index 0000000000..3dcdc2c779 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/clusterApi.ts @@ -0,0 +1,177 @@ +import helpers, { getHeadlampAPIHeaders } from '../../../helpers'; +import { ConfigState } from '../../../redux/configSlice'; +import store from '../../../redux/stores/store'; +import { + deleteClusterKubeconfig, + findKubeconfigByClusterName, + storeStatelessClusterKubeconfig, +} from '../../../stateless'; +import { getCluster } from '../../util'; +import { ClusterRequest, clusterRequest, post, request } from './clusterRequests'; +import { JSON_HEADERS } from './constants'; + +/** + * Test authentication for the given cluster. + * Will throw an error if the user is not authenticated. + */ +export async function testAuth(cluster = '', namespace = 'default') { + const spec = { namespace }; + const clusterName = cluster || getCluster(); + + return post('/apis/authorization.k8s.io/v1/selfsubjectrulesreviews', { spec }, false, { + timeout: 5 * 1000, + cluster: clusterName, + }); +} + +/** + * Checks cluster health + * Will throw an error if the cluster is not healthy. + */ +export async function testClusterHealth(cluster?: string) { + const clusterName = cluster || getCluster() || ''; + return clusterRequest('/healthz', { isJSON: false, cluster: clusterName }); +} + +export async function setCluster(clusterReq: ClusterRequest) { + const kubeconfig = clusterReq.kubeconfig; + + if (kubeconfig) { + await storeStatelessClusterKubeconfig(kubeconfig); + // We just send parsed kubeconfig from the backend to the frontend. + return request( + '/parseKubeConfig', + { + method: 'POST', + body: JSON.stringify(clusterReq), + headers: { + ...JSON_HEADERS, + }, + }, + false, + false + ); + } + + return request( + '/cluster', + { + method: 'POST', + body: JSON.stringify(clusterReq), + headers: { + ...JSON_HEADERS, + ...getHeadlampAPIHeaders(), + }, + }, + false, + false + ); +} + +// @todo: needs documenting. + +export async function deleteCluster( + cluster: string +): Promise<{ clusters: ConfigState['clusters'] }> { + if (cluster) { + const kubeconfig = await findKubeconfigByClusterName(cluster); + if (kubeconfig !== null) { + await deleteClusterKubeconfig(cluster); + window.location.reload(); + return { clusters: {} }; + } + } + + return request( + `/cluster/${cluster}`, + { method: 'DELETE', headers: { ...getHeadlampAPIHeaders() } }, + false, + false + ); +} + +/** + * getClusterDefaultNamespace gives the default namespace for the given cluster. + * + * If the checkSettings parameter is true (default), it will check the cluster settings first. + * Otherwise it will just check the cluster config. This means that if one needs the default + * namespace that may come from the kubeconfig, call this function with the checkSettings parameter as false. + * + * @param cluster The cluster name. + * @param checkSettings Whether to check the settings for the default namespace (otherwise it just checks the cluster config). Defaults to true. + * + * @returns The default namespace for the given cluster. + */ +export function getClusterDefaultNamespace(cluster: string, checkSettings?: boolean): string { + const includeSettings = checkSettings ?? true; + let defaultNamespace = ''; + + if (!!cluster) { + if (includeSettings) { + const clusterSettings = helpers.loadClusterSettings(cluster); + defaultNamespace = clusterSettings?.defaultNamespace || ''; + } + + if (!defaultNamespace) { + const state = store.getState(); + const clusterDefaultNs: string = + state.config?.clusters?.[cluster]?.meta_data?.namespace || ''; + defaultNamespace = clusterDefaultNs; + } + } + + return defaultNamespace; +} + +/** + * renameCluster sends call to backend to update a field in kubeconfig which + * is the custom name of the cluster used by the user. + * @param cluster + */ +export async function renameCluster(cluster: string, newClusterName: string, source: string) { + let stateless = false; + if (cluster) { + const kubeconfig = await findKubeconfigByClusterName(cluster); + if (kubeconfig !== null) { + stateless = true; + } + } + + return request( + `/cluster/${cluster}`, + { + method: 'PUT', + headers: { ...getHeadlampAPIHeaders() }, + body: JSON.stringify({ newClusterName, source, stateless }), + }, + false, + false + ); +} + +/** + * parseKubeConfig sends call to backend to parse kubeconfig and send back + * the parsed clusters and contexts. + * @param clusterReq - The cluster request object. + */ +export async function parseKubeConfig(clusterReq: ClusterRequest) { + const kubeconfig = clusterReq.kubeconfig; + + if (kubeconfig) { + return request( + '/parseKubeConfig', + { + method: 'POST', + body: JSON.stringify(clusterReq), + headers: { + ...JSON_HEADERS, + ...getHeadlampAPIHeaders(), + }, + }, + false, + false + ); + } + + return null; +} diff --git a/frontend/src/lib/k8s/apiProxy/clusterRequests.ts b/frontend/src/lib/k8s/apiProxy/clusterRequests.ts new file mode 100644 index 0000000000..32e4516888 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/clusterRequests.ts @@ -0,0 +1,284 @@ +// @todo: Params is a confusing name for options, because params are also query params. + +import { isDebugVerbose } from '../../../helpers'; +import store from '../../../redux/stores/store'; +import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../stateless'; +import { getToken, logout, setToken } from '../../auth'; +import { getCluster } from '../../cluster'; +import { KubeObjectInterface } from '../cluster'; +import { BASE_HTTP_URL, CLUSTERS_PREFIX, DEFAULT_TIMEOUT, JSON_HEADERS } from './constants'; +import { asQuery, combinePath } from './formatUrl'; +import { QueryParameters } from './queryParameters'; +import { refreshToken } from './tokenApi'; + +export interface ApiError extends Error { + status: number; +} + +/** + * Options for the request. + */ +export interface RequestParams extends RequestInit { + /** Number of milliseconds to wait for a response. */ + timeout?: number; + /** Is the request expected to receive JSON data? */ + isJSON?: boolean; + /** Cluster context name. */ + cluster?: string | null; + /** Whether to automatically log out the user if there is an authentication error. */ + autoLogoutOnAuthError?: boolean; +} + +export interface ClusterRequest { + /** The name of the cluster (has to be unique, or it will override an existing cluster) */ + name?: string; + /** The cluster URL */ + server?: string; + /** Whether the server's certificate should not be checked for validity */ + insecureTLSVerify?: boolean; + /** The certificate authority data */ + certificateAuthorityData?: string; + /** KubeConfig (base64 encoded)*/ + kubeconfig?: string; +} + +/** + * The options for `clusterRequest`. + */ +export interface ClusterRequestParams extends RequestParams { + cluster?: string | null; + autoLogoutOnAuthError?: boolean; +} + +/** + * @returns Auth type of the cluster, or an empty string if the cluster is not found. + * It could return 'oidc' or '' for example. + * + * @param cluster - Name of the cluster. + */ +export function getClusterAuthType(cluster: string): string { + const state = store.getState(); + const authType: string = state.config?.clusters?.[cluster]?.['auth_type'] || ''; + return authType; +} + +/** + * Sends a request to the backend. If the useCluster parameter is true (which it is, by default), it will be + * treated as a request to the Kubernetes server of the currently defined (in the URL) cluster. + * + * @param path - The path to the API endpoint. + * @param params - Optional parameters for the request. + * @param autoLogoutOnAuthError - Whether to automatically log out the user if there is an authentication error. + * @param useCluster - Whether to use the current cluster for the request. + * @param queryParams - Optional query parameters for the request. + * + * @returns A Promise that resolves to the JSON response from the API server. + * @throws An ApiError if the response status is not ok. + */ +export async function request( + path: string, + params: RequestParams = {}, + autoLogoutOnAuthError: boolean = true, + useCluster: boolean = true, + queryParams?: QueryParameters +): Promise { + // @todo: This is a temporary way of getting the current cluster. We should improve it later. + const cluster = (useCluster && getCluster()) || ''; + + if (isDebugVerbose('k8s/apiProxy@request')) { + console.debug('k8s/apiProxy@request', { path, params, useCluster, queryParams }); + } + + return clusterRequest(path, { cluster, autoLogoutOnAuthError, ...params }, queryParams); +} + +/** + * Sends a request to the backend. If the cluster is required in the params parameter, it will + * be used as a request to the respective Kubernetes server. + * + * @param path - The path to the API endpoint. + * @param params - Optional parameters for the request. + * @param queryParams - Optional query parameters for the k8s request. + * + * @returns A Promise that resolves to the JSON response from the API server. + * @throws An ApiError if the response status is not ok. + */ +export async function clusterRequest( + path: string, + params: ClusterRequestParams = {}, + queryParams?: QueryParameters +): Promise { + interface RequestHeaders { + Authorization?: string; + cluster?: string; + autoLogoutOnAuthError?: boolean; + [otherHeader: string]: any; + } + + const { + timeout = DEFAULT_TIMEOUT, + cluster: paramsCluster, + autoLogoutOnAuthError = true, + isJSON = true, + ...otherParams + } = params; + + const userID = getUserIdFromLocalStorage(); + const opts: { headers: RequestHeaders } = Object.assign({ headers: {} }, otherParams); + const cluster = paramsCluster || ''; + + let fullPath = path; + if (cluster) { + const token = getToken(cluster); + const kubeconfig = await findKubeconfigByClusterName(cluster); + if (kubeconfig !== null) { + opts.headers['KUBECONFIG'] = kubeconfig; + opts.headers['X-HEADLAMP-USER-ID'] = userID; + } + + // Refresh service account token only if the cluster auth type is not OIDC + if (getClusterAuthType(cluster) !== 'oidc') { + await refreshToken(token); + } + + if (!!token) { + opts.headers.Authorization = `Bearer ${token}`; + } + + fullPath = combinePath(`/${CLUSTERS_PREFIX}/${cluster}`, path); + } + + const controller = new AbortController(); + const id = setTimeout(() => controller.abort(), timeout); + + let url = combinePath(BASE_HTTP_URL, fullPath); + url += asQuery(queryParams); + const requestData = { signal: controller.signal, ...opts }; + let response: Response = new Response(undefined, { status: 502, statusText: 'Unreachable' }); + try { + response = await fetch(url, requestData); + } catch (err) { + if (err instanceof Error) { + if (err.name === 'AbortError') { + response = new Response(undefined, { status: 408, statusText: 'Request timed-out' }); + } + } + } finally { + clearTimeout(id); + } + + // The backend signals through this header that it wants a reload. + // See plugins.go + const headerVal = response.headers.get('X-Reload'); + if (headerVal && headerVal.indexOf('reload') !== -1) { + window.location.reload(); + } + + // In case of OIDC auth if the token is about to expire the backend + // sends a refreshed token in the response header. + const newToken = response.headers.get('X-Authorization'); + if (newToken) { + setToken(cluster, newToken); + } + + if (!response.ok) { + const { status, statusText } = response; + if (autoLogoutOnAuthError && status === 401 && opts.headers.Authorization) { + console.error('Logging out due to auth error', { status, statusText, path }); + logout(); + } + + let message = statusText; + try { + if (isJSON) { + const json = await response.json(); + message += ` - ${json.message}`; + } + } catch (err) { + console.error( + 'Unable to parse error json at url:', + url, + { err }, + 'with request data:', + requestData + ); + } + + const error = new Error(message) as ApiError; + error.status = status; + return Promise.reject(error); + } + + if (!isJSON) { + return Promise.resolve(response); + } + + return response.json(); +} + +export function post( + url: string, + json: JSON | object | KubeObjectInterface, + autoLogoutOnAuthError: boolean = true, + options: ClusterRequestParams = {} +) { + const { cluster: clusterName, ...requestOptions } = options; + const body = JSON.stringify(json); + const cluster = clusterName || getCluster() || ''; + return clusterRequest(url, { + method: 'POST', + body, + headers: JSON_HEADERS, + cluster, + autoLogoutOnAuthError, + ...requestOptions, + }); +} + +export function patch( + url: string, + json: any, + autoLogoutOnAuthError = true, + options: ClusterRequestParams = {} +) { + const { cluster: clusterName, ...requestOptions } = options; + const body = JSON.stringify(json); + const cluster = clusterName || getCluster() || ''; + const opts = { + method: 'PATCH', + body, + headers: { ...JSON_HEADERS, 'Content-Type': 'application/merge-patch+json' }, + autoLogoutOnAuthError, + cluster, + ...requestOptions, + }; + return clusterRequest(url, opts); +} + +export function put( + url: string, + json: Partial, + autoLogoutOnAuthError = true, + requestOptions: ClusterRequestParams = {} +) { + const body = JSON.stringify(json); + const { cluster: clusterName, ...restOptions } = requestOptions; + const opts = { + method: 'PUT', + body, + headers: JSON_HEADERS, + autoLogoutOnAuthError, + cluster: clusterName || getCluster() || '', + ...restOptions, + }; + return clusterRequest(url, opts); +} + +export function remove(url: string, requestOptions: ClusterRequestParams = {}) { + const { cluster: clusterName, ...restOptions } = requestOptions; + const cluster = clusterName || getCluster() || ''; + const opts = { method: 'DELETE', headers: JSON_HEADERS, cluster, ...restOptions }; + return clusterRequest(url, opts); +} + +// @todo: apiEndpoint.put has a type of any, which needs improving. diff --git a/frontend/src/lib/k8s/apiProxy/constants.ts b/frontend/src/lib/k8s/apiProxy/constants.ts new file mode 100644 index 0000000000..7d9ab6047f --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/constants.ts @@ -0,0 +1,7 @@ +import helpers from '../../../helpers'; + +export const BASE_HTTP_URL = helpers.getAppUrl(); +export const CLUSTERS_PREFIX = 'clusters'; +export const JSON_HEADERS = { Accept: 'application/json', 'Content-Type': 'application/json' }; +export const DEFAULT_TIMEOUT = 2 * 60 * 1000; // ms +export const MIN_LIFESPAN_FOR_TOKEN_REFRESH = 10; // sec diff --git a/frontend/src/lib/k8s/apiProxy/drainNode.ts b/frontend/src/lib/k8s/apiProxy/drainNode.ts new file mode 100644 index 0000000000..be782498a5 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/drainNode.ts @@ -0,0 +1,77 @@ +import helpers from '../../../helpers'; +import { getToken } from '../../auth'; +import { JSON_HEADERS } from './constants'; + +/** + * Drain a node + * + * @param cluster - The cluster to drain the node + * @param nodeName - The node name to drain + * + * @returns {Promise} + * @throws {Error} if the request fails + * @throws {Error} if the response is not ok + * + * This function is used to drain a node. It is used in the node detail page. + * As draining a node is a long running process, we get the request received + * message if the request is successful. And then we poll the drain node status endpoint + * to get the status of the drain node process. + */ +export function drainNode(cluster: string, nodeName: string) { + return fetch(`${helpers.getAppUrl()}drain-node`, { + method: 'POST', + headers: new Headers({ + Authorization: `Bearer ${getToken(cluster)}`, + ...JSON_HEADERS, + }), + body: JSON.stringify({ + cluster, + nodeName, + }), + }).then(response => { + return response.json().then(data => { + if (!response.ok) { + throw new Error('Something went wrong'); + } + return data; + }); + }); +} + +// @todo: needs documenting. + +interface DrainNodeStatus { + id: string; //@todo: what is this and what is it for? + cluster: string; +} + +/** + * Get the status of the drain node process. + * + * It is used in the node detail page. + * As draining a node is a long running process, we poll this endpoint to get + * the status of the drain node process. + * + * @param cluster - The cluster to get the status of the drain node process for. + * @param nodeName - The node name to get the status of the drain node process for. + * + * @returns - The response from the API. @todo: what response? + * @throws {Error} if the request fails + * @throws {Error} if the response is not ok + */ +export function drainNodeStatus(cluster: string, nodeName: string): Promise { + return fetch(`${helpers.getAppUrl()}drain-node-status?cluster=${cluster}&nodeName=${nodeName}`, { + method: 'GET', + headers: new Headers({ + Authorization: `Bearer ${getToken(cluster)}`, + ...JSON_HEADERS, + }), + }).then(response => { + return response.json().then((data: DrainNodeStatus) => { + if (!response.ok) { + throw new Error('Something went wrong'); + } + return data; + }); + }); +} diff --git a/frontend/src/lib/k8s/apiProxy/factories.ts b/frontend/src/lib/k8s/apiProxy/factories.ts new file mode 100644 index 0000000000..f6338ab420 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/factories.ts @@ -0,0 +1,536 @@ +// @todo: repeatStreamFunc could be improved for performance by remembering when a URL +// is 404 and not trying it again... and again. + +import { OpPatch } from 'json-patch'; +import { isDebugVerbose } from '../../../helpers'; +import { getCluster } from '../../cluster'; +import { KubeObjectInterface } from '../cluster'; +import { ApiError, clusterRequest, patch, post, put, remove } from './clusterRequests'; +import { asQuery, getApiRoot } from './formatUrl'; +import { QueryParameters } from './queryParameters'; +import { apiScaleFactory, ScaleApi } from './scaleApi'; +import { + StreamErrCb, + streamResult, + StreamResultsCb, + streamResultsForCluster, +} from './streamingApi'; + +export type CancelFunction = () => void; +export type SingleApiFactoryArguments = [group: string, version: string, resource: string]; +export type MultipleApiFactoryArguments = SingleApiFactoryArguments[]; +export type ApiFactoryArguments = SingleApiFactoryArguments | MultipleApiFactoryArguments; + +export type SimpleApiFactoryWithNamespaceArguments = [ + group: string, + version: string, + resource: string, + includeScale?: boolean +]; +export type MultipleApiFactoryWithNamespaceArguments = SimpleApiFactoryWithNamespaceArguments[]; +export type ApiFactoryWithNamespaceArguments = + | SimpleApiFactoryWithNamespaceArguments + | MultipleApiFactoryWithNamespaceArguments; + +type RecursivePartial = { + [P in keyof T]?: T[P] extends (infer U)[] + ? RecursivePartial[] + : T[P] extends object | undefined + ? RecursivePartial + : T[P]; +}; + +export interface ApiClient { + list: ( + cb: StreamResultsCb, + errCb: StreamErrCb, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + get: ( + name: string, + cb: StreamResultsCb, + errCb: StreamErrCb, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + post: ( + body: RecursivePartial, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + put: ( + body: ResourceType, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + patch: ( + body: OpPatch[], + name: string, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + delete: (name: string, queryParams?: QueryParameters, cluster?: string) => Promise; + isNamespaced: boolean; + apiInfo: { + group: string; + version: string; + resource: string; + }[]; +} + +export interface ApiWithNamespaceClient { + list: ( + namespace: string, + cb: StreamResultsCb, + errCb: StreamErrCb, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + get: ( + namespace: string, + name: string, + cb: StreamResultsCb, + errCb: StreamErrCb, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + post: ( + body: RecursivePartial, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + put: ( + body: KubeObjectInterface, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + patch: ( + body: OpPatch[], + namespace: string, + name: string, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + delete: ( + namespace: string, + name: string, + queryParams?: QueryParameters, + cluster?: string + ) => Promise; + isNamespaced: boolean; + apiInfo: { + group: string; + version: string; + resource: string; + }[]; + scale?: ScaleApi; +} + +/** + * Returns list of object keys, where the value is a function. + */ +type FunctionKeys = { + [K in keyof T]: T[K] extends (...args: any[]) => any ? K : never; +}[keyof T]; + +/** + * Repeats a streaming function call across multiple API endpoints until a + * successful response is received or all endpoints have been exhausted. + * + * This is especially useful for Kubernetes beta APIs that then stabalize. + * So the APIs are available at different endpoints on different versions of Kubernetes. + * + * @param apiEndpoints - An array of API endpoint objects returned by the `apiFactory` function. + * @param funcName - The name of the streaming function to call on each endpoint. + * @param errCb - A callback function to handle errors that occur during the streaming function call. + * @param args - Additional arguments to pass to the streaming function. + * + * @returns A function that cancels the streaming function call. + */ +async function repeatStreamFunc< + ResourceType extends KubeObjectInterface, + FuncName extends FunctionKeys> +>( + apiEndpoints: (ApiClient | ApiWithNamespaceClient)[], + funcName: FuncName, + errCb: StreamErrCb, + ...args: any[] +) { + let isCancelled = false; + let streamCancel = () => {}; + + if (isDebugVerbose('k8s/apiProxy@repeatStreamFunc')) { + console.debug('k8s/apiProxy@repeatStreamFunc', { apiEndpoints, funcName, args }); + } + + function runStreamFunc( + endpointIndex: number, + funcName: FuncName, + errCb: StreamErrCb, + ...args: any[] + ) { + const endpoint = apiEndpoints[endpointIndex]; + const fullArgs = [...args]; + let errCbIndex = funcName === 'get' ? 2 : 1; + if (endpoint.isNamespaced) { + ++errCbIndex; + } + fullArgs.splice(errCbIndex, 0, errCb); + + const func: any = endpoint[funcName]; + + if (typeof func !== 'function') { + throw new Error(`The function ${funcName} does not exist on the endpoint`); + } + + return func(...fullArgs); + } + + let endpointIndex = 0; + const cancel: StreamErrCb = async (err, cancelStream) => { + if (isCancelled) { + return; + } + if (err.status === 404 && endpointIndex < apiEndpoints.length) { + // Cancel current stream + if (cancelStream) { + cancelStream(); + } + + streamCancel = await runStreamFunc(endpointIndex++, funcName, cancel, ...args); + } else if (!!errCb) { + errCb(err, streamCancel); + } + }; + + streamCancel = await runStreamFunc(endpointIndex++, funcName, cancel, ...args); + + return () => { + isCancelled = true; + streamCancel(); + }; +} + +type ObjectMethodParameters = Object[Key] extends ( + ...args: infer P +) => any + ? P + : never; + +/** + * Repeats a factory method call across multiple API endpoints until a + * successful response is received or all endpoints have been exhausted. + * + * This is especially useful for Kubernetes beta APIs that then stabalize. + * @param apiEndpoints - An array of API endpoint objects returned by the `apiFactory` function. + * @param funcName - The name of the factory method to call on each endpoint. + * + * @returns A function that cancels the factory method call. + */ +function repeatFactoryMethod< + Client extends ApiClient | ApiWithNamespaceClient, + ResourceType extends KubeObjectInterface, + FuncName extends FunctionKeys +>( + apiEndpoints: Array, + funcName: FuncName +): (...args: ObjectMethodParameters) => any { + return async (...args) => { + for (let i = 0; i < apiEndpoints.length; i++) { + try { + const endpoint = apiEndpoints[i]; + const func: any = endpoint[funcName as keyof typeof endpoint]; + return await func(...args); + } catch (err) { + // If the error is 404 and we still have other endpoints, then try the next one + if ((err as ApiError).status === 404 && i !== apiEndpoints.length - 1) { + continue; + } + + throw err; + } + } + }; +} + +// @todo: in apiFactory, and multipleApiFactory use rather than 'args'... +// `group: string, version: string, resource: string` + +/** + * Creates an API client for a single or multiple Kubernetes resources. + * + * @param args - The arguments to pass to either `singleApiFactory` or `multipleApiFactory`. + * + * @returns An API client for the specified Kubernetes resource(s). + */ +export function apiFactory( + ...args: ApiFactoryArguments +): ApiClient { + if (isDebugVerbose('k8s/apiProxy@apiFactory')) { + console.debug('k8s/apiProxy@apiFactory', { args }); + } + + if (args[0] instanceof Array) { + return multipleApiFactory(...(args as MultipleApiFactoryArguments)); + } + + return singleApiFactory(...(args as SingleApiFactoryArguments)); +} + +/** + * Creates an API endpoint object for multiple API endpoints. + * It first tries the first endpoint, then the second, and so on until it + * gets a successful response. + * + * @param args - An array of arguments to pass to the `singleApiFactory` function. + * + * @returns An API endpoint object. + */ +export function multipleApiFactory( + ...args: MultipleApiFactoryArguments +): ApiClient { + if (isDebugVerbose('k8s/apiProxy@multipleApiFactory')) { + console.debug('k8s/apiProxy@multipleApiFactory', { args }); + } + + const apiEndpoints = args.map(apiArgs => singleApiFactory(...apiArgs)); + + return { + list: (cb, errCb, queryParams, cluster) => { + return repeatStreamFunc(apiEndpoints, 'list', errCb, cb, queryParams, cluster); + }, + get: (name, cb, errCb, queryParams, cluster) => + repeatStreamFunc(apiEndpoints, 'get', errCb, name, cb, queryParams, cluster), + post: repeatFactoryMethod(apiEndpoints, 'post'), + patch: repeatFactoryMethod(apiEndpoints, 'patch'), + put: repeatFactoryMethod(apiEndpoints, 'put'), + delete: repeatFactoryMethod(apiEndpoints, 'delete'), + isNamespaced: false, + apiInfo: args.map(apiArgs => ({ + group: apiArgs[0], + version: apiArgs[1], + resource: apiArgs[2], + })), + }; +} + +/** + * Describes the API for a certain resource. + */ +export interface ApiInfo { + /** The API group. */ + group: string; + /** The API version. */ + version: string; + /** The resource name. */ + resource: string; +} + +// @todo: singleApiFactory should have a return type rather than just what it returns. + +/** + * @returns An object with methods for interacting with a single API endpoint. + * + * @param group - The API group. + * @param version - The API version. + * @param resource - The API resource. + */ +export function singleApiFactory( + ...[group, version, resource]: SingleApiFactoryArguments +): ApiClient { + if (isDebugVerbose('k8s/apiProxy@singleApiFactory')) { + console.debug('k8s/apiProxy@singleApiFactory', { group, version, resource }); + } + + const apiRoot = getApiRoot(group, version); + const url = `${apiRoot}/${resource}`; + return { + list: (cb, errCb, queryParams, cluster) => { + if (isDebugVerbose('k8s/apiProxy@singleApiFactory list')) { + console.debug('k8s/apiProxy@singleApiFactory list', { cluster, queryParams }); + } + + return streamResultsForCluster(url, { cb, errCb, cluster }, queryParams); + }, + get: (name, cb, errCb, queryParams, cluster) => + streamResult(url, name, cb, errCb, queryParams, cluster), + post: (body, queryParams, cluster) => post(url + asQuery(queryParams), body, true, { cluster }), + put: (body, queryParams, cluster) => + put(`${url}/${body.metadata.name}` + asQuery(queryParams), body, true, { cluster }), + patch: (body, name, queryParams, cluster) => + patch(`${url}/${name}` + asQuery({ ...queryParams, ...{ pretty: 'true' } }), body, true, { + cluster, + }), + delete: (name, queryParams, cluster) => + remove(`${url}/${name}` + asQuery(queryParams), { cluster }), + isNamespaced: false, + apiInfo: [{ group, version, resource }], + }; +} + +// @todo: just use args from simpleApiFactoryWithNamespace, rather than `args`? +// group: string, version: string, resource: string, includeScale: boolean = false + +export function apiFactoryWithNamespace( + ...args: ApiFactoryWithNamespaceArguments +) { + if (args[0] instanceof Array) { + return multipleApiFactoryWithNamespace( + ...(args as MultipleApiFactoryWithNamespaceArguments) + ); + } + + return simpleApiFactoryWithNamespace(...(args as SimpleApiFactoryWithNamespaceArguments)); +} + +function multipleApiFactoryWithNamespace( + ...args: MultipleApiFactoryWithNamespaceArguments +): ApiWithNamespaceClient { + const apiEndpoints = args.map(apiArgs => simpleApiFactoryWithNamespace(...apiArgs)); + + return { + list: (namespace, cb, errCb, queryParams, cluster) => { + return repeatStreamFunc(apiEndpoints, 'list', errCb, namespace, cb, queryParams, cluster); + }, + get: (namespace, name, cb, errCb, queryParams, cluster) => + repeatStreamFunc(apiEndpoints, 'get', errCb, namespace, name, cb, queryParams, cluster), + post: repeatFactoryMethod(apiEndpoints, 'post'), + patch: repeatFactoryMethod(apiEndpoints, 'patch'), + put: repeatFactoryMethod(apiEndpoints, 'put'), + delete: repeatFactoryMethod(apiEndpoints, 'delete'), + isNamespaced: true, + apiInfo: args.map(apiArgs => ({ + group: apiArgs[0], + version: apiArgs[1], + resource: apiArgs[2], + })), + }; +} + +function simpleApiFactoryWithNamespace( + ...[group, version, resource, includeScale = false]: SimpleApiFactoryWithNamespaceArguments +): ApiWithNamespaceClient { + if (isDebugVerbose('k8s/apiProxy@simpleApiFactoryWithNamespace')) { + console.debug('k8s/apiProxy@simpleApiFactoryWithNamespace', { + group, + version, + resource, + includeScale, + }); + } + + const apiRoot = getApiRoot(group, version); + const results: ApiWithNamespaceClient = { + list: (namespace, cb, errCb, queryParams, cluster) => { + if (isDebugVerbose('k8s/apiProxy@simpleApiFactoryWithNamespace list')) { + console.debug('k8s/apiProxy@simpleApiFactoryWithNamespace list', { cluster, queryParams }); + } + + return streamResultsForCluster(url(namespace), { cb, errCb, cluster }, queryParams); + }, + get: (namespace, name, cb, errCb, queryParams, cluster) => + streamResult(url(namespace), name, cb, errCb, queryParams, cluster), + post: (body, queryParams, cluster) => + post(url(body.metadata?.namespace!) + asQuery(queryParams), body, true, { cluster }), + patch: (body, namespace, name, queryParams, cluster) => + patch( + `${url(namespace)}/${name}` + asQuery({ ...queryParams, ...{ pretty: 'true' } }), + body, + true, + { cluster } + ), + put: (body, queryParams, cluster) => + put( + `${url(body.metadata.namespace!)}/${body.metadata.name}` + asQuery(queryParams), + body, + true, + { cluster } + ), + delete: (namespace, name, queryParams, cluster) => + remove(`${url(namespace)}/${name}` + asQuery(queryParams), { cluster }), + isNamespaced: true, + apiInfo: [{ group, version, resource }], + }; + + if (includeScale) { + results.scale = apiScaleFactory(apiRoot, resource); + } + + return results; + + function url(namespace: string) { + return namespace ? `${apiRoot}/namespaces/${namespace}/${resource}` : `${apiRoot}/${resource}`; + } +} + +export async function resourceDefToApiFactory( + resourceDef: KubeObjectInterface, + clusterName?: string +): Promise | ApiWithNamespaceClient> { + interface APIResourceList { + resources: { + kind: string; + namespaced: boolean; + singularName: string; + name: string; + }[]; + [other: string]: any; + } + if (isDebugVerbose('k8s/apiProxy@resourceDefToApiFactory')) { + console.debug('k8s/apiProxy@resourceDefToApiFactory', { resourceDef }); + } + + if (!resourceDef.kind) { + throw new Error(`Cannot handle unknown resource kind: ${resourceDef.kind}`); + } + + if (!resourceDef.apiVersion) { + throw new Error(`Definition ${resourceDef.kind} has no apiVersion`); + } + + let [apiGroup, apiVersion] = resourceDef.apiVersion.split('/'); + + // There may not be an API group [1], which means that the apiGroup variable will + // actually hold the apiVersion, so we switch them. + // [1] https://kubernetes.io/docs/reference/using-api/#api-groups + if (!!apiGroup && !apiVersion) { + apiVersion = apiGroup; + apiGroup = ''; + } + + if (!apiVersion) { + throw new Error(`apiVersion has no version string: ${resourceDef.apiVersion}`); + } + + const cluster = clusterName || getCluster() || ''; + + // Get details about this resource. We could avoid this for known resources, but + // this way we always get the right plural name and we also avoid eventually getting + // the wrong "known" resource because e.g. there can be CustomResources with the same + // kind as a known resource. + const apiPrefix = !!apiGroup ? 'apis' : 'api'; + const apiResult: APIResourceList = await clusterRequest( + `/${apiPrefix}/${resourceDef.apiVersion}`, + { + cluster, + autoLogoutOnAuthError: false, + } + ); + if (!apiResult) { + throw new Error(`Unkown apiVersion: ${resourceDef.apiVersion}`); + } + + // Get resource + const resource = apiResult.resources?.find(({ kind }) => kind === resourceDef.kind); + + if (!resource) { + throw new Error(`Unkown resource kind: ${resourceDef.kind}`); + } + + const hasNamespace = !!resource.namespaced; + + const factoryFunc = hasNamespace ? apiFactoryWithNamespace : apiFactory; + + return factoryFunc(apiGroup, apiVersion, resource.name); +} diff --git a/frontend/src/lib/k8s/apiProxy/formatUrl.ts b/frontend/src/lib/k8s/apiProxy/formatUrl.ts new file mode 100644 index 0000000000..d88f2c0345 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/formatUrl.ts @@ -0,0 +1,54 @@ +import { omit } from 'lodash'; +import { QueryParameters } from './queryParameters'; + +export function buildUrl(urlOrParts: string | string[], queryParams?: QueryParameters): string { + const url = Array.isArray(urlOrParts) ? urlOrParts.join('/') : urlOrParts; + return url + asQuery(queryParams); +} + +/** + * Combines a base path and a path to create a full path. + * + * Doesn't matter if the start or the end has a single slash, the result will always have a single slash. + * + * @param base - The base path. + * @param path - The path to combine with the base path. + * + * @returns The combined path. + */ +export function combinePath(base: string, path: string): string { + if (base.endsWith('/')) base = base.slice(0, -1); // eslint-disable-line no-param-reassign + if (path.startsWith('/')) path = path.slice(1); // eslint-disable-line no-param-reassign + return `${base}/${path}`; +} + +export function getApiRoot(group: string, version: string) { + return group ? `/apis/${group}/${version}` : `api/${version}`; +} + +/** + * Converts k8s queryParams to a URL query string. + * + * @param queryParams - The k8s API query parameters to convert. + * @returns The query string (starting with '?'), or empty string. + */ +export function asQuery(queryParams?: QueryParameters): string { + if (queryParams === undefined) { + return ''; + } + + let newQueryParams; + if (typeof queryParams.limit === 'number' || typeof queryParams.limit === 'string') { + newQueryParams = { + ...queryParams, + limit: + typeof queryParams.limit === 'number' ? queryParams.limit.toString() : queryParams.limit, + }; + } else { + newQueryParams = { ...omit(queryParams, 'limit') }; + } + + return !!newQueryParams && !!Object.keys(newQueryParams).length + ? '?' + new URLSearchParams(newQueryParams).toString() + : ''; +} diff --git a/frontend/src/lib/k8s/apiProxy/index.ts b/frontend/src/lib/k8s/apiProxy/index.ts index 232c7f986f..df5824a584 100644 --- a/frontend/src/lib/k8s/apiProxy/index.ts +++ b/frontend/src/lib/k8s/apiProxy/index.ts @@ -15,2003 +15,63 @@ * - No docs on some functions and interfaces. * - Async is missing on some functions that need to be marked as so. * - Some of the users of the functions are not handling errors. - * - * Additionally, it needs to be refactored into an apiProxy/ folder, with the - * functionality broken up into smaller files by topic/feature. Keeping the - * apiProxy/index.ts file as the entry point for backwards compatibility - * exporting everything from there. */ -import { OpPatch } from 'json-patch'; -import _ from 'lodash'; -import { decodeToken } from 'react-jwt'; -import helpers, { getHeadlampAPIHeaders, isDebugVerbose } from '../../../helpers'; -import store from '../../../redux/stores/store'; -import { - deleteClusterKubeconfig, - findKubeconfigByClusterName, - getUserIdFromLocalStorage, - storeStatelessClusterKubeconfig, -} from '../../../stateless'; -import { getToken, logout, setToken } from '../../auth'; -import { getCluster } from '../../util'; -import { KubeMetadata, KubeMetrics, KubeObjectInterface } from '../cluster'; -import { KubeToken } from '../token'; - // Uncomment the following lines to enable verbose debug logging in this module. // import { debugVerbose } from '../../helpers'; // debugVerbose('k8s/apiProxy'); -const BASE_HTTP_URL = helpers.getAppUrl(); -const BASE_WS_URL = BASE_HTTP_URL.replace('http', 'ws'); -const CLUSTERS_PREFIX = 'clusters'; -const JSON_HEADERS = { Accept: 'application/json', 'Content-Type': 'application/json' }; -const DEFAULT_TIMEOUT = 2 * 60 * 1000; // ms -const MIN_LIFESPAN_FOR_TOKEN_REFRESH = 10; // sec - -let isTokenRefreshInProgress = false; - -// @todo: Params is a confusing name for options, because params are also query params. -/** - * Options for the request. - */ -export interface RequestParams extends RequestInit { - /** Number of milliseconds to wait for a response. */ - timeout?: number; - /** Is the request expected to receive JSON data? */ - isJSON?: boolean; - /** Cluster context name. */ - cluster?: string | null; - /** Whether to automatically log out the user if there is an authentication error. */ - autoLogoutOnAuthError?: boolean; -} - -export interface ClusterRequest { - /** The name of the cluster (has to be unique, or it will override an existing cluster) */ - name?: string; - /** The cluster URL */ - server?: string; - /** Whether the server's certificate should not be checked for validity */ - insecureTLSVerify?: boolean; - /** The certificate authority data */ - certificateAuthorityData?: string; - /** KubeConfig (base64 encoded)*/ - kubeconfig?: string; -} - -// @todo: QueryParamaters should be specific to different resources. -// Because some only support some paramaters. - -/** - * QueryParamaters is a map of query parameters for the Kubernetes API. - */ -export interface QueryParameters { - /** - * Continue token for paging through large result sets. - * - * The continue option should be set when retrieving more results from the server. - * Since this value is server defined, clients may only use the continue value - * from a previous query result with identical query parameters - * (except for the value of continue) and the server may reject a continue value - * it does not recognize. If the specified continue value is no longer valid - * whether due to expiration (generally five to fifteen minutes) or a - * configuration change on the server, the server will respond with a - * 410 ResourceExpired error together with a continue token. If the client - * needs a consistent list, it must restart their list without the continue field. - * Otherwise, the client may send another list request with the token received - * with the 410 error, the server will respond with a list starting from the next - * key, but from the latest snapshot, which is inconsistent from the previous - * list results - objects that are created, modified, or deleted after the first - * list request will be included in the response, as long as their keys are after - * the "next key". - * - * This field is not supported when watch is true. Clients may start a watch from - * the last resourceVersion value returned by the server and not miss any modifications. - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#retrieving-large-results-sets-in-chunks - */ - continue?: string; - /** - * dryRun causes apiserver to simulate the request, and report whether the object would be modified. - * Can be '' or 'All' - * - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#dry-run - */ - dryRun?: string; - /** - * fieldSeletor restricts the list of returned objects by their fields. Defaults to everything. - * - * @see https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/ - */ - fieldSelector?: string; - /** - * labelSelector restricts the list of returned objects by their labels. Defaults to everything. - * - * @see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api - * @see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors - */ - labelSelector?: string; - /** - * limit is a maximum number of responses to return for a list call. - * - * If more items exist, the server will set the continue field on the list - * metadata to a value that can be used with the same initial query to retrieve - * the next set of results. Setting a limit may return fewer than the requested - * amount of items (up to zero items) in the event all requested objects are - * filtered out and clients should only use the presence of the continue field - * to determine whether more results are available. Servers may choose not to - * support the limit argument and will return all of the available results. - * If limit is specified and the continue field is empty, clients may assume - * that no more results are available. - * - * This field is not supported if watch is true. - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#retrieving-large-results-sets-in-chunks - */ - limit?: string | number; - /** - * resourceVersion sets a constraint on what resource versions a request may be served from. - * Defaults to unset - * - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions - */ - resourceVersion?: string; - /** - * allowWatchBookmarks means watch events with type "BOOKMARK" will also be sent. - * - * Can be 'true' - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks - */ - allowWatchBookmarks?: string; - /** - * sendInitialEvents controls whether the server will send the events - * for a watch before sending the current list state. - * - * Can be 'true'. - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists - */ - sendInitialEvents?: string; - /** - * The resource version to match. - * - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - */ - resourceVersionMatch?: string; - /** - * If 'true', then the output is pretty printed. - * Can be '' or 'true' - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#output-options - */ - pretty?: string; - /** - * watch instead of a list or get, watch for changes to the requested object(s). - * - * Can be 1. - * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes - */ - watch?: string; -} - -/** - * Refreshes the token if it is about to expire. - * - * @param token - The token to refresh. For null token it just does nothing. - * - * @note Sets the token with `setToken` if the token is refreshed. - * @note Uses global `isTokenRefreshInProgress` to prevent multiple token - * refreshes at the same time. - */ -async function refreshToken(token: string | null): Promise { - if (!token || isTokenRefreshInProgress) { - return; - } - // decode token - const decodedToken: any = decodeToken(token); - - // return if the token doesn't have an expiry time - if (!decodedToken?.exp) { - return; - } - // convert expiry seconds to date object - const expiry = decodedToken.exp; - const now = new Date().valueOf(); - const expDate = new Date(0); - expDate.setUTCSeconds(expiry); - - // calculate time to expiry in seconds - const diff = (expDate.valueOf() - now) / 1000; - // If the token is not about to expire return - // comparing the time to expiry with the minimum lifespan for a token both in seconds - if (diff > MIN_LIFESPAN_FOR_TOKEN_REFRESH) { - return; - } - const namespace = - (decodedToken && decodedToken['kubernetes.io'] && decodedToken['kubernetes.io']['namespace']) || - ''; - const serviceAccountName = - (decodedToken && - decodedToken['kubernetes.io'] && - decodedToken['kubernetes.io']['serviceaccount'] && - decodedToken['kubernetes.io']['serviceaccount']['name']) || - {}; - const cluster = getCluster(); - if (!cluster || namespace === '' || serviceAccountName === '') { - return; - } - - if (isDebugVerbose('k8s/apiProxy@refreshToken')) { - console.debug('k8s/apiProxy@refreshToken', 'Refreshing token'); - } - - isTokenRefreshInProgress = true; - - let tokenUrl = combinePath(BASE_HTTP_URL, `/${CLUSTERS_PREFIX}/${cluster}`); - tokenUrl = combinePath( - tokenUrl, - `api/v1/namespaces/${namespace}/serviceaccounts/${serviceAccountName}/token` - ); - const tokenData = { - kind: 'TokenRequest', - apiVersion: 'authentication.k8s.io/v1', - metadata: { creationTimestamp: null }, - spec: { expirationSeconds: 86400 }, - }; - - try { - const headers = new Headers({ - ...JSON_HEADERS, - }); - - const token = getToken(cluster); - if (!!token) { - headers.set('Authorization', `Bearer ${token}`); - } - - const response = await fetch(tokenUrl, { - method: 'POST', - headers, - body: JSON.stringify(tokenData), - }); - - if (response.status === 201) { - const token: KubeToken = await response.json(); - setToken(cluster, token.status.token); - } - - isTokenRefreshInProgress = false; - } catch (err) { - console.error('Error refreshing token', err); - isTokenRefreshInProgress = false; - } -} - -/** - * @returns Auth type of the cluster, or an empty string if the cluster is not found. - * It could return 'oidc' or '' for example. - * - * @param cluster - Name of the cluster. - */ -function getClusterAuthType(cluster: string): string { - const state = store.getState(); - const authType: string = state.config?.clusters?.[cluster]?.['auth_type'] || ''; - return authType; -} - -/** - * Sends a request to the backend. If the useCluster parameter is true (which it is, by default), it will be - * treated as a request to the Kubernetes server of the currently defined (in the URL) cluster. - * - * @param path - The path to the API endpoint. - * @param params - Optional parameters for the request. - * @param autoLogoutOnAuthError - Whether to automatically log out the user if there is an authentication error. - * @param useCluster - Whether to use the current cluster for the request. - * @param queryParams - Optional query parameters for the request. - * - * @returns A Promise that resolves to the JSON response from the API server. - * @throws An ApiError if the response status is not ok. - */ -export async function request( - path: string, - params: RequestParams = {}, - autoLogoutOnAuthError: boolean = true, - useCluster: boolean = true, - queryParams?: QueryParameters -): Promise { - // @todo: This is a temporary way of getting the current cluster. We should improve it later. - const cluster = (useCluster && getCluster()) || ''; - - if (isDebugVerbose('k8s/apiProxy@request')) { - console.debug('k8s/apiProxy@request', { path, params, useCluster, queryParams }); - } - - return clusterRequest(path, { cluster, autoLogoutOnAuthError, ...params }, queryParams); -} - -/** - * The options for `clusterRequest`. - */ -export interface ClusterRequestParams extends RequestParams { - cluster?: string | null; - autoLogoutOnAuthError?: boolean; -} - -/** - * Sends a request to the backend. If the cluster is required in the params parameter, it will - * be used as a request to the respective Kubernetes server. - * - * @param path - The path to the API endpoint. - * @param params - Optional parameters for the request. - * @param queryParams - Optional query parameters for the k8s request. - * - * @returns A Promise that resolves to the JSON response from the API server. - * @throws An ApiError if the response status is not ok. - */ -export async function clusterRequest( - path: string, - params: ClusterRequestParams = {}, - queryParams?: QueryParameters -): Promise { - interface RequestHeaders { - Authorization?: string; - cluster?: string; - autoLogoutOnAuthError?: boolean; - [otherHeader: string]: any; - } - - const { - timeout = DEFAULT_TIMEOUT, - cluster: paramsCluster, - autoLogoutOnAuthError = true, - isJSON = true, - ...otherParams - } = params; - - const userID = getUserIdFromLocalStorage(); - const opts: { headers: RequestHeaders } = Object.assign({ headers: {} }, otherParams); - const cluster = paramsCluster || ''; - - let fullPath = path; - if (cluster) { - const token = getToken(cluster); - const kubeconfig = await findKubeconfigByClusterName(cluster); - if (kubeconfig !== null) { - opts.headers['KUBECONFIG'] = kubeconfig; - opts.headers['X-HEADLAMP-USER-ID'] = userID; - } - - // Refresh service account token only if the cluster auth type is not OIDC - if (getClusterAuthType(cluster) !== 'oidc') { - await refreshToken(token); - } - - if (!!token) { - opts.headers.Authorization = `Bearer ${token}`; - } - - fullPath = combinePath(`/${CLUSTERS_PREFIX}/${cluster}`, path); - } - - const controller = new AbortController(); - const id = setTimeout(() => controller.abort(), timeout); - - let url = combinePath(BASE_HTTP_URL, fullPath); - url += asQuery(queryParams); - const requestData = { signal: controller.signal, ...opts }; - let response: Response = new Response(undefined, { status: 502, statusText: 'Unreachable' }); - try { - response = await fetch(url, requestData); - } catch (err) { - if (err instanceof Error) { - if (err.name === 'AbortError') { - response = new Response(undefined, { status: 408, statusText: 'Request timed-out' }); - } - } - } finally { - clearTimeout(id); - } - - // The backend signals through this header that it wants a reload. - // See plugins.go - const headerVal = response.headers.get('X-Reload'); - if (headerVal && headerVal.indexOf('reload') !== -1) { - window.location.reload(); - } - - // In case of OIDC auth if the token is about to expire the backend - // sends a refreshed token in the response header. - const newToken = response.headers.get('X-Authorization'); - if (newToken) { - setToken(cluster, newToken); - } - - if (!response.ok) { - const { status, statusText } = response; - if (autoLogoutOnAuthError && status === 401 && opts.headers.Authorization) { - console.error('Logging out due to auth error', { status, statusText, path }); - logout(); - } - - let message = statusText; - try { - if (isJSON) { - const json = await response.json(); - message += ` - ${json.message}`; - } - } catch (err) { - console.error( - 'Unable to parse error json at url:', - url, - { err }, - 'with request data:', - requestData - ); - } - - const error = new Error(message) as ApiError; - error.status = status; - return Promise.reject(error); - } - - if (!isJSON) { - return Promise.resolve(response); - } - - return response.json(); -} - -// @todo: there should be more specific args and types on StreamResultsCb than '...args: any'. - -/** The callback that's called when some results are streamed in. */ -export type StreamResultsCb = (...args: any[]) => void; -/** The callback that's called when there's an error streaming the results. */ -export type StreamErrCb = (err: Error & { status?: number }, cancelStreamFunc?: () => void) => void; - -type ApiFactoryReturn = ReturnType | ReturnType; - -// @todo: repeatStreamFunc could be improved for performance by remembering when a URL -// is 404 and not trying it again... and again. - -/** - * Repeats a streaming function call across multiple API endpoints until a - * successful response is received or all endpoints have been exhausted. - * - * This is especially useful for Kubernetes beta APIs that then stabalize. - * So the APIs are available at different endpoints on different versions of Kubernetes. - * - * @param apiEndpoints - An array of API endpoint objects returned by the `apiFactory` function. - * @param funcName - The name of the streaming function to call on each endpoint. - * @param errCb - A callback function to handle errors that occur during the streaming function call. - * @param args - Additional arguments to pass to the streaming function. - * - * @returns A function that cancels the streaming function call. - */ -async function repeatStreamFunc( - apiEndpoints: ApiFactoryReturn[], - funcName: keyof ApiFactoryReturn, - errCb: StreamErrCb, - ...args: any[] -) { - let isCancelled = false; - let streamCancel = () => {}; - - if (isDebugVerbose('k8s/apiProxy@repeatStreamFunc')) { - console.debug('k8s/apiProxy@repeatStreamFunc', { apiEndpoints, funcName, args }); - } - - function runStreamFunc( - endpointIndex: number, - funcName: string, - errCb: StreamErrCb, - ...args: any[] - ) { - const endpoint = apiEndpoints[endpointIndex]; - const fullArgs = [...args]; - let errCbIndex = funcName === 'get' ? 2 : 1; - if (endpoint.isNamespaced) { - ++errCbIndex; - } - fullArgs.splice(errCbIndex, 0, errCb); - - return endpoint[funcName as keyof ApiFactoryReturn](...fullArgs); - } - - let endpointIndex = 0; - const cancel: StreamErrCb = async (err, cancelStream) => { - if (isCancelled) { - return; - } - if (err.status === 404 && endpointIndex < apiEndpoints.length) { - // Cancel current stream - if (cancelStream) { - cancelStream(); - } - - streamCancel = await runStreamFunc(endpointIndex++, funcName, cancel, ...args); - } else if (!!errCb) { - errCb(err, streamCancel); - } - }; - - streamCancel = await runStreamFunc(endpointIndex++, funcName, cancel, ...args); - - return () => { - isCancelled = true; - streamCancel(); - }; -} - -/** - * Repeats a factory method call across multiple API endpoints until a - * successful response is received or all endpoints have been exhausted. - * - * This is especially useful for Kubernetes beta APIs that then stabalize. - * @param apiEndpoints - An array of API endpoint objects returned by the `apiFactory` function. - * @param funcName - The name of the factory method to call on each endpoint. - * - * @returns A function that cancels the factory method call. - */ -function repeatFactoryMethod(apiEndpoints: ApiFactoryReturn[], funcName: keyof ApiFactoryReturn) { - return async (...args: Parameters) => { - for (let i = 0; i < apiEndpoints.length; i++) { - try { - const endpoint = apiEndpoints[i]; - return await endpoint[funcName](...args); - } catch (err) { - // If the error is 404 and we still have other endpoints, then try the next one - if ((err as ApiError).status === 404 && i !== apiEndpoints.length - 1) { - continue; - } - - throw err; - } - } - }; -} - -// @todo: in apiFactory, and multipleApiFactory use rather than 'args'... -// `group: string, version: string, resource: string` - -/** - * Creates an API client for a single or multiple Kubernetes resources. - * - * @param args - The arguments to pass to either `singleApiFactory` or `multipleApiFactory`. - * - * @returns An API client for the specified Kubernetes resource(s). - */ -export function apiFactory( - ...args: Parameters | Parameters -) { - if (isDebugVerbose('k8s/apiProxy@apiFactory')) { - console.debug('k8s/apiProxy@apiFactory', { args }); - } - - if (args[0] instanceof Array) { - return multipleApiFactory(...(args as Parameters)); - } - - return singleApiFactory(...(args as Parameters)); -} - -/** - * Creates an API endpoint object for multiple API endpoints. - * It first tries the first endpoint, then the second, and so on until it - * gets a successful response. - * - * @param args - An array of arguments to pass to the `singleApiFactory` function. - * - * @returns An API endpoint object. - */ -function multipleApiFactory( - ...args: Parameters[] -): ReturnType { - if (isDebugVerbose('k8s/apiProxy@multipleApiFactory')) { - console.debug('k8s/apiProxy@multipleApiFactory', { args }); - } - - const apiEndpoints: ReturnType[] = args.map(apiArgs => - singleApiFactory(...apiArgs) - ); - - return { - list: ( - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => { - return repeatStreamFunc(apiEndpoints, 'list', errCb, cb, queryParams, cluster); - }, - get: ( - name: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => repeatStreamFunc(apiEndpoints, 'get', errCb, name, cb, queryParams, cluster), - post: repeatFactoryMethod(apiEndpoints, 'post'), - patch: repeatFactoryMethod(apiEndpoints, 'patch'), - put: repeatFactoryMethod(apiEndpoints, 'put'), - delete: repeatFactoryMethod(apiEndpoints, 'delete'), - isNamespaced: false, - apiInfo: args.map(apiArgs => ({ - group: apiArgs[0], - version: apiArgs[1], - resource: apiArgs[2], - })), - }; -} - -/** - * Describes the API for a certain resource. - */ -export interface ApiInfo { - /** The API group. */ - group: string; - /** The API version. */ - version: string; - /** The resource name. */ - resource: string; -} - -// @todo: singleApiFactory should have a return type rather than just what it returns. - -/** - * @returns An object with methods for interacting with a single API endpoint. - * - * @param group - The API group. - * @param version - The API version. - * @param resource - The API resource. - */ -function singleApiFactory(group: string, version: string, resource: string) { - if (isDebugVerbose('k8s/apiProxy@singleApiFactory')) { - console.debug('k8s/apiProxy@singleApiFactory', { group, version, resource }); - } - - const apiRoot = getApiRoot(group, version); - const url = `${apiRoot}/${resource}`; - return { - list: ( - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => { - if (isDebugVerbose('k8s/apiProxy@singleApiFactory list')) { - console.debug('k8s/apiProxy@singleApiFactory list', { cluster, queryParams }); - } - - return streamResultsForCluster(url, { cb, errCb, cluster }, queryParams); - }, - get: ( - name: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => streamResult(url, name, cb, errCb, queryParams, cluster), - post: ( - body: JSON | object | KubeObjectInterface, - queryParams?: QueryParameters, - cluster?: string - ) => post(url + asQuery(queryParams), body, true, { cluster }), - put: (body: KubeObjectInterface, queryParams?: QueryParameters, cluster?: string) => - put(`${url}/${body.metadata.name}` + asQuery(queryParams), body, true, { cluster }), - patch: (body: OpPatch[], name: string, queryParams?: QueryParameters, cluster?: string) => - patch(`${url}/${name}` + asQuery({ ...queryParams, ...{ pretty: 'true' } }), body, true, { - cluster, - }), - delete: (name: string, queryParams?: QueryParameters, cluster?: string) => - remove(`${url}/${name}` + asQuery(queryParams), { cluster }), - isNamespaced: false, - apiInfo: [{ group, version, resource }], - }; -} - -// @todo: just use args from simpleApiFactoryWithNamespace, rather than `args`? -// group: string, version: string, resource: string, includeScale: boolean = false - -export function apiFactoryWithNamespace( - ...args: - | Parameters - | Parameters -) { - if (args[0] instanceof Array) { - return multipleApiFactoryWithNamespace( - ...(args as Parameters) - ); - } - - return simpleApiFactoryWithNamespace( - ...(args as Parameters) - ); -} - -function multipleApiFactoryWithNamespace( - ...args: Parameters[] -): ReturnType { - const apiEndpoints: ReturnType[] = args.map(apiArgs => - simpleApiFactoryWithNamespace(...apiArgs) - ); - - return { - list: ( - namespace: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => { - return repeatStreamFunc(apiEndpoints, 'list', errCb, namespace, cb, queryParams, cluster); - }, - get: ( - namespace: string, - name: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => repeatStreamFunc(apiEndpoints, 'get', errCb, namespace, name, cb, queryParams, cluster), - post: repeatFactoryMethod(apiEndpoints, 'post'), - patch: repeatFactoryMethod(apiEndpoints, 'patch'), - put: repeatFactoryMethod(apiEndpoints, 'put'), - delete: repeatFactoryMethod(apiEndpoints, 'delete'), - isNamespaced: true, - apiInfo: args.map(apiArgs => ({ - group: apiArgs[0], - version: apiArgs[1], - resource: apiArgs[2], - })), - }; -} - -function simpleApiFactoryWithNamespace( - group: string, - version: string, - resource: string, - includeScale: boolean = false -) { - if (isDebugVerbose('k8s/apiProxy@simpleApiFactoryWithNamespace')) { - console.debug('k8s/apiProxy@simpleApiFactoryWithNamespace', { - group, - version, - resource, - includeScale, - }); - } - - const apiRoot = getApiRoot(group, version); - const results: { - scale?: ReturnType; - // @todo: Need to write types for these properties instead. - [other: string]: any; - } = { - list: ( - namespace: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => { - if (isDebugVerbose('k8s/apiProxy@simpleApiFactoryWithNamespace list')) { - console.debug('k8s/apiProxy@simpleApiFactoryWithNamespace list', { cluster, queryParams }); - } - - return streamResultsForCluster(url(namespace), { cb, errCb, cluster }, queryParams); - }, - get: ( - namespace: string, - name: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string - ) => streamResult(url(namespace), name, cb, errCb, queryParams, cluster), - post: (body: KubeObjectInterface, queryParams?: QueryParameters, cluster?: string) => - post(url(body.metadata.namespace!) + asQuery(queryParams), body, true, { cluster }), - patch: ( - body: OpPatch[], - namespace: string, - name: string, - queryParams?: QueryParameters, - cluster?: string - ) => - patch( - `${url(namespace)}/${name}` + asQuery({ ...queryParams, ...{ pretty: 'true' } }), - body, - true, - { cluster } - ), - put: (body: KubeObjectInterface, queryParams?: QueryParameters, cluster?: string) => - put( - `${url(body.metadata.namespace!)}/${body.metadata.name}` + asQuery(queryParams), - body, - true, - { cluster } - ), - delete: (namespace: string, name: string, queryParams?: QueryParameters, cluster?: string) => - remove(`${url(namespace)}/${name}` + asQuery(queryParams), { cluster }), - isNamespaced: true, - apiInfo: [{ group, version, resource }], - }; - - if (includeScale) { - results.scale = apiScaleFactory(apiRoot, resource); - } - - return results; - - function url(namespace: string) { - return namespace ? `${apiRoot}/namespaces/${namespace}/${resource}` : `${apiRoot}/${resource}`; - } -} - -/** - * Converts k8s queryParams to a URL query string. - * - * @param queryParams - The k8s API query parameters to convert. - * @returns The query string (starting with '?'), or empty string. - */ -function asQuery(queryParams?: QueryParameters): string { - if (queryParams === undefined) { - return ''; - } - - let newQueryParams; - if (typeof queryParams.limit === 'number' || typeof queryParams.limit === 'string') { - newQueryParams = { - ...queryParams, - limit: - typeof queryParams.limit === 'number' ? queryParams.limit.toString() : queryParams.limit, - }; - } else { - newQueryParams = { ..._.omit(queryParams, 'limit') }; - } - - return !!newQueryParams && !!Object.keys(newQueryParams).length - ? '?' + new URLSearchParams(newQueryParams).toString() - : ''; -} - -async function resourceDefToApiFactory( - resourceDef: KubeObjectInterface, - clusterName?: string -): Promise { - interface APIResourceList { - resources: { - kind: string; - namespaced: boolean; - singularName: string; - name: string; - }[]; - [other: string]: any; - } - if (isDebugVerbose('k8s/apiProxy@resourceDefToApiFactory')) { - console.debug('k8s/apiProxy@resourceDefToApiFactory', { resourceDef }); - } - - if (!resourceDef.kind) { - throw new Error(`Cannot handle unknown resource kind: ${resourceDef.kind}`); - } - - if (!resourceDef.apiVersion) { - throw new Error(`Definition ${resourceDef.kind} has no apiVersion`); - } - - let [apiGroup, apiVersion] = resourceDef.apiVersion.split('/'); - - // There may not be an API group [1], which means that the apiGroup variable will - // actually hold the apiVersion, so we switch them. - // [1] https://kubernetes.io/docs/reference/using-api/#api-groups - if (!!apiGroup && !apiVersion) { - apiVersion = apiGroup; - apiGroup = ''; - } - - if (!apiVersion) { - throw new Error(`apiVersion has no version string: ${resourceDef.apiVersion}`); - } - - const cluster = clusterName || getCluster() || ''; - - // Get details about this resource. We could avoid this for known resources, but - // this way we always get the right plural name and we also avoid eventually getting - // the wrong "known" resource because e.g. there can be CustomResources with the same - // kind as a known resource. - const apiPrefix = !!apiGroup ? 'apis' : 'api'; - const apiResult: APIResourceList = await clusterRequest( - `/${apiPrefix}/${resourceDef.apiVersion}`, - { - cluster, - autoLogoutOnAuthError: false, - } - ); - if (!apiResult) { - throw new Error(`Unkown apiVersion: ${resourceDef.apiVersion}`); - } - - // Get resource - const resource = apiResult.resources?.find(({ kind }) => kind === resourceDef.kind); - - if (!resource) { - throw new Error(`Unkown resource kind: ${resourceDef.kind}`); - } - - const hasNamespace = !!resource.namespaced; - - let factoryFunc: typeof apiFactory | typeof apiFactoryWithNamespace = apiFactory; - if (!!hasNamespace) { - factoryFunc = apiFactoryWithNamespace; - } - - return factoryFunc(apiGroup, apiVersion, resource.name); -} - -function getApiRoot(group: string, version: string) { - return group ? `/apis/${group}/${version}` : `api/${version}`; -} - -function apiScaleFactory(apiRoot: string, resource: string) { - return { - get: (namespace: string, name: string, clusterName?: string) => { - const cluster = clusterName || getCluster() || ''; - return clusterRequest(url(namespace, name), { cluster }); - }, - put: (body: { metadata: KubeMetadata; spec: { replicas: number } }, clusterName?: string) => { - const cluster = clusterName || getCluster() || ''; - return put(url(body.metadata.namespace!, body.metadata.name), body, undefined, { cluster }); - }, - patch: ( - body: { - spec: { - replicas: number; - }; - }, - metadata: KubeMetadata, - clusterName?: string - ) => { - const cluster = clusterName || getCluster() || ''; - return patch(url(metadata.namespace!, metadata.name), body, false, { cluster }); - }, - }; - - function url(namespace: string, name: string) { - return `${apiRoot}/namespaces/${namespace}/${resource}/${name}/scale`; - } -} - -export function post( - url: string, - json: JSON | object | KubeObjectInterface, - autoLogoutOnAuthError: boolean = true, - options: ClusterRequestParams = {} -) { - const { cluster: clusterName, ...requestOptions } = options; - const body = JSON.stringify(json); - const cluster = clusterName || getCluster() || ''; - return clusterRequest(url, { - method: 'POST', - body, - headers: JSON_HEADERS, - cluster, - autoLogoutOnAuthError, - ...requestOptions, - }); -} - -export function patch( - url: string, - json: any, - autoLogoutOnAuthError = true, - options: ClusterRequestParams = {} -) { - const { cluster: clusterName, ...requestOptions } = options; - const body = JSON.stringify(json); - const cluster = clusterName || getCluster() || ''; - const opts = { - method: 'PATCH', - body, - headers: { ...JSON_HEADERS, 'Content-Type': 'application/merge-patch+json' }, - autoLogoutOnAuthError, - cluster, - ...requestOptions, - }; - return clusterRequest(url, opts); -} - -export function put( - url: string, - json: Partial, - autoLogoutOnAuthError = true, - requestOptions: ClusterRequestParams = {} -) { - const body = JSON.stringify(json); - const { cluster: clusterName, ...restOptions } = requestOptions; - const opts = { - method: 'PUT', - body, - headers: JSON_HEADERS, - autoLogoutOnAuthError, - cluster: clusterName || getCluster() || '', - ...restOptions, - }; - return clusterRequest(url, opts); -} - -export function remove(url: string, requestOptions: ClusterRequestParams = {}) { - const { cluster: clusterName, ...restOptions } = requestOptions; - const cluster = clusterName || getCluster() || ''; - const opts = { method: 'DELETE', headers: JSON_HEADERS, cluster, ...restOptions }; - return clusterRequest(url, opts); -} - -/** - * Streams the results of a Kubernetes API request into a 'cb' callback. - * - * @param url - The URL of the Kubernetes API endpoint. - * @param name - The name of the Kubernetes API resource. - * @param cb - The callback function to execute when the stream receives data. - * @param errCb - The callback function to execute when an error occurs. - * @param queryParams - The query parameters to include in the API request. - * - * @returns A function to cancel the stream. - */ -export async function streamResult( - url: string, - name: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams?: QueryParameters, - cluster?: string -) { - let isCancelled = false; - let socket: ReturnType; - const clusterName = cluster || getCluster() || ''; - - if (isDebugVerbose('k8s/apiProxy@streamResult')) { - console.debug('k8s/apiProxy@streamResult', { url, name, queryParams }); - } - - run(); - - return cancel; - - async function run() { - try { - const item = await clusterRequest(`${url}/${name}` + asQuery(queryParams), { - cluster: clusterName, - }); - - if (isCancelled) return; - - if (isDebugVerbose('k8s/apiProxy@streamResult run cb(item)')) { - console.debug('k8s/apiProxy@streamResult run cb(item)', { item }); - } - - cb(item); - - const watchUrl = - url + - asQuery({ ...queryParams, ...{ watch: '1', fieldSelector: `metadata.name=${name}` } }); - - socket = stream(watchUrl, x => cb(x.object), { isJson: true, cluster: clusterName }); - } catch (err) { - console.error('Error in api request', { err, url }); - // @todo: sometimes errCb is {}, the typing for apiProxy needs improving. - // See https://github.com/kinvolk/headlamp/pull/833 - if (errCb && typeof errCb === 'function') errCb(err as ApiError, cancel); - } - } - - function cancel() { - if (isCancelled) return; - isCancelled = true; - - if (socket) socket.cancel(); - } -} - -// @todo: needs a return type. - -/** - * Streams the results of a Kubernetes API request. - * - * @param url - The URL of the Kubernetes API endpoint. - * @param cb - The callback function to execute when the stream receives data. - * @param errCb - The callback function to execute when an error occurs. - * @param queryParams - The query parameters to include in the API request. - * - * @returns A function to cancel the stream. - */ -export async function streamResults( - url: string, - cb: StreamResultsCb, - errCb: StreamErrCb, - queryParams: QueryParameters | undefined -) { - const cluster = getCluster() || ''; - return streamResultsForCluster(url, { cb, errCb, cluster }, queryParams); -} - -// @todo: this interface needs documenting. - -export interface StreamResultsParams { - cb: StreamResultsCb; - errCb: StreamErrCb; - cluster?: string; -} - -// @todo: needs a return type. -// @todo: needs documenting - -export async function streamResultsForCluster( - url: string, - params: StreamResultsParams, - queryParams: QueryParameters | undefined -) { - const { cb, errCb, cluster = '' } = params; - const clusterName = cluster || getCluster() || ''; - - const results: { - [uid: string]: KubeObjectInterface; - } = {}; - let isCancelled = false; - let socket: ReturnType; - - if (isDebugVerbose('k8s/apiProxy@streamResults')) { - console.debug('k8s/apiProxy@streamResults', { url, queryParams }); - } - - // -1 means unlimited. - const maxResources = - typeof queryParams?.limit === 'number' - ? queryParams.limit - : parseInt(queryParams?.limit ?? '-1'); - - run(); - - return cancel; - - async function run() { - try { - const { kind, items, metadata } = await clusterRequest(url + asQuery(queryParams), { - cluster: clusterName, - }); - - if (isCancelled) return; - - add(items, kind); - - const watchUrl = - url + - asQuery({ ...queryParams, ...{ watch: '1', resourceVersion: metadata.resourceVersion } }); - socket = stream(watchUrl, update, { isJson: true, cluster: clusterName }); - } catch (err) { - console.error('Error in api request', { err, url }); - if (errCb && typeof errCb === 'function') { - errCb(err as ApiError, cancel); - } - } - } - - function cancel() { - if (isCancelled) return; - isCancelled = true; - - if (socket) socket.cancel(); - } - - function add(items: KubeObjectInterface[], kind: string) { - const fixedKind = kind.slice(0, -4); // Trim off the word "List" from the end of the string - for (const item of items) { - item.kind = fixedKind; - results[item.metadata.uid] = item; - } - - push(); - } - - function update({ - type, - object, - }: { - type: 'ADDED' | 'MODIFIED' | 'DELETED' | 'ERROR'; - object: KubeObjectInterface; - }) { - object.actionType = type; // eslint-disable-line no-param-reassign - - switch (type) { - case 'ADDED': - results[object.metadata.uid] = object; - break; - case 'MODIFIED': { - const existing = results[object.metadata.uid]; - - if (existing) { - if (!existing.metadata.resourceVersion || !object.metadata.resourceVersion) { - console.error('Missing resourceVersion in object', object); - break; - } - const currentVersion = parseInt(existing.metadata.resourceVersion, 10); - const newVersion = parseInt(object.metadata.resourceVersion, 10); - if (currentVersion < newVersion) { - Object.assign(existing, object); - } - } else { - results[object.metadata.uid] = object; - } - - break; - } - case 'DELETED': - delete results[object.metadata.uid]; - break; - case 'ERROR': - console.error('Error in update', { type, object }); - break; - default: - console.error('Unknown update type', type); - } - - push(); - } - - function push() { - const values = Object.values(results); - // Limit the number of resources to maxResources. We do this because when we're streaming, the - // API server will send us all the resources that match the query, without limitting, even if the - // API params wanted to limit it. So we do the limitting here. - if (maxResources > 0 && values.length > maxResources) { - values.sort((a, b) => { - const aTime = new Date(a.lastTimestamp || a.metadata.creationTimestamp!).getTime(); - const bTime = new Date(b.lastTimestamp || b.metadata.creationTimestamp!).getTime(); - // Reverse sort, so we have the most recent resources at the beginning of the array. - return 0 - (aTime - bTime); - }); - values.splice(0, values.length - maxResources); - } - - if (isDebugVerbose('k8s/apiProxy@push cb(values)')) { - console.debug('k8s/apiProxy@push cb(values)', { values }); - } - cb(values); - } -} - -/** - * Configure a stream with... StreamArgs. - */ -export interface StreamArgs { - /** Whether the stream is expected to receive JSON data. */ - isJson?: boolean; - /** Additional WebSocket protocols to use when connecting. */ - additionalProtocols?: string[]; - /** A callback function to execute when the WebSocket connection is established. */ - connectCb?: () => void; - /** Whether to attempt to reconnect the WebSocket connection if it fails. */ - reconnectOnFailure?: boolean; - /** A callback function to execute when the WebSocket connection fails. */ - failCb?: () => void; - tty?: boolean; - stdin?: boolean; - stdout?: boolean; - stderr?: boolean; - cluster?: string; -} - -/** - * Establishes a WebSocket connection to the specified URL and streams the results - * to the provided callback function. - * - * @param url - The URL to connect to. - * @param cb - The callback function to receive the streamed results. - * @param args - Additional arguments to configure the stream. - * - * @returns An object with two functions: `cancel`, which can be called to cancel - * the stream, and `getSocket`, which returns the WebSocket object. - */ -export function stream(url: string, cb: StreamResultsCb, args: StreamArgs) { - let connection: { close: () => void; socket: WebSocket | null } | null = null; - let isCancelled = false; - const { failCb, cluster = '' } = args; - // We only set reconnectOnFailure as true by default if the failCb has not been provided. - const { isJson = false, additionalProtocols, connectCb, reconnectOnFailure = !failCb } = args; - - if (isDebugVerbose('k8s/apiProxy@stream')) { - console.debug('k8s/apiProxy@stream', { url, args }); - } - - connect(); - - return { cancel, getSocket }; - - function getSocket() { - return connection ? connection.socket : null; - } - - function cancel() { - if (connection) connection.close(); - isCancelled = true; - } - - async function connect() { - if (connectCb) connectCb(); - try { - connection = await connectStream(url, cb, onFail, isJson, additionalProtocols, cluster); - } catch (error) { - console.error('Error connecting stream:', error); - onFail(); - } - } - - function retryOnFail() { - if (isCancelled) return; - - if (reconnectOnFailure) { - if (isDebugVerbose('k8s/apiProxy@stream retryOnFail')) { - console.debug('k8s/apiProxy@stream retryOnFail', 'Reconnecting in 3 seconds', { url }); - } - - setTimeout(connect, 3000); - } - } - - function onFail() { - if (!!failCb) { - failCb(); - } - - if (reconnectOnFailure) { - retryOnFail(); - } - } -} - -// @todo: needs a return type. - -/** - * Connects to a WebSocket stream at the specified path and returns an object - * with a `close` function and a `socket` property. Sends messages to `cb` callback. - * - * @param path - The path of the WebSocket stream to connect to. - * @param cb - The function to call with each message received from the stream. - * @param onFail - The function to call if the stream is closed unexpectedly. - * @param isJson - Whether the messages should be parsed as JSON. - * @param additionalProtocols - An optional array of additional WebSocket protocols to use. - * - * @returns An object with a `close` function and a `socket` property. - */ -async function connectStream( - path: string, - cb: StreamResultsCb, - onFail: () => void, - isJson: boolean, - additionalProtocols: string[] = [], - cluster = '' -) { - return connectStreamWithParams(path, cb, onFail, { - isJson, - cluster: cluster || getCluster() || '', - additionalProtocols, - }); -} - -// @todo: needs documenting. - -interface StreamParams { - cluster?: string; - isJson?: boolean; - additionalProtocols?: string[]; -} - -/** - * connectStreamWithParams is a wrapper around connectStream that allows for more - * flexibility in the parameters that can be passed to the WebSocket connection. - * - * This is an async function because it may need to fetch the kubeconfig for the - * cluster if the cluster is specified in the params. If kubeconfig is found, it - * sends the X-HEADLAMP-USER-ID header with the user ID from the localStorage. - * It is sent as a base64url encoded string in protocal format: - * `base64url.headlamp.authorization.k8s.io.${userID}`. - * - * @param path - The path of the WebSocket stream to connect to. - * @param cb - The function to call with each message received from the stream. - * @param onFail - The function to call if the stream is closed unexpectedly. - * @param params - Stream parameters to configure the connection. - * - * @returns A promise that resolves to an object with a `close` function and a `socket` property. - */ -async function connectStreamWithParams( - path: string, - cb: StreamResultsCb, - onFail: () => void, - params?: StreamParams -): Promise<{ - close: () => void; - socket: WebSocket | null; -}> { - const { isJson = false, additionalProtocols = [], cluster = '' } = params || {}; - let isClosing = false; - - const token = getToken(cluster || ''); - const userID = getUserIdFromLocalStorage(); - - const protocols = ['base64.binary.k8s.io', ...additionalProtocols]; - if (token) { - const encodedToken = btoa(token).replace(/=/g, ''); - protocols.push(`base64url.bearer.authorization.k8s.io.${encodedToken}`); - } - - let fullPath = path; - let url = ''; - if (cluster) { - fullPath = combinePath(`/${CLUSTERS_PREFIX}/${cluster}`, path); - try { - const kubeconfig = await findKubeconfigByClusterName(cluster); - - if (kubeconfig !== null) { - protocols.push(`base64url.headlamp.authorization.k8s.io.${userID}`); - } - - url = combinePath(BASE_WS_URL, fullPath); - } catch (error) { - console.error('Error while finding kubeconfig:', error); - // If we can't find the kubeconfig, we'll just use the base URL. - url = combinePath(BASE_WS_URL, fullPath); - } - } - - let socket: WebSocket | null = null; - try { - socket = new WebSocket(url, protocols); - socket.binaryType = 'arraybuffer'; - socket.addEventListener('message', onMessage); - socket.addEventListener('close', onClose); - socket.addEventListener('error', onError); - } catch (error) { - console.error(error); - } - - return { close, socket }; - - function close() { - isClosing = true; - if (!socket) { - return; - } - - socket.close(); - } - - function onMessage(body: MessageEvent) { - if (isClosing) return; - - const item = isJson ? JSON.parse(body.data) : body.data; - if (isDebugVerbose('k8s/apiProxy@connectStream onMessage cb(item)')) { - console.debug('k8s/apiProxy@connectStream onMessage cb(item)', { item }); - } - - cb(item); - } - - function onClose(...args: any[]) { - if (isClosing) return; - isClosing = true; - if (!socket) { - return; - } - - if (socket) { - socket.removeEventListener('message', onMessage); - socket.removeEventListener('close', onClose); - socket.removeEventListener('error', onError); - } - - console.warn('Socket closed unexpectedly', { path, args }); - onFail(); - } - - function onError(err: any) { - console.error('Error in api stream', { err, path }); - } -} - -/** - * Combines a base path and a path to create a full path. - * - * Doesn't matter if the start or the end has a single slash, the result will always have a single slash. - * - * @param base - The base path. - * @param path - The path to combine with the base path. - * - * @returns The combined path. - */ -function combinePath(base: string, path: string): string { - if (base.endsWith('/')) base = base.slice(0, -1); // eslint-disable-line no-param-reassign - if (path.startsWith('/')) path = path.slice(1); // eslint-disable-line no-param-reassign - return `${base}/${path}`; -} - -// @todo: apply() and other requests return Promise Can we get it to return a better type? -// @todo: Promise doesn't make any sense as a type. -/** - * Applies the provided body to the Kubernetes API. - * - * Tries to POST, and if there's a conflict it does a PUT to the api endpoint. - * - * @param body - The kubernetes object body to apply. - * @param clusterName - The cluster to apply the body to. By default uses the current cluster (URL defined). - * - * @returns The response from the kubernetes API server. - */ -export async function apply(body: KubeObjectInterface, clusterName?: string): Promise { - const bodyToApply = _.cloneDeep(body); - - let apiEndpoint; - try { - apiEndpoint = await resourceDefToApiFactory(bodyToApply, clusterName); - } catch (err) { - console.error(`Error getting api endpoint when applying the resource ${bodyToApply}: ${err}`); - throw err; - } - - const cluster = clusterName || getCluster(); - - // Check if the default namespace is needed. And we need to do this before - // getting the apiEndpoint because it will affect the endpoint itself. - const isNamespaced = apiEndpoint.isNamespaced; - const { namespace } = body.metadata; - if (!namespace && isNamespaced) { - let defaultNamespace = 'default'; - - if (!!cluster) { - defaultNamespace = getClusterDefaultNamespace(cluster) || 'default'; - } - - bodyToApply.metadata.namespace = defaultNamespace; - } - - const resourceVersion = bodyToApply.metadata.resourceVersion; - - try { - delete bodyToApply.metadata.resourceVersion; - return await apiEndpoint.post(bodyToApply, {}, cluster); - } catch (err) { - // Check to see if failed because the record already exists. - // If the failure isn't a 409 (i.e. Confilct), just rethrow. - if ((err as ApiError).status !== 409) throw err; - - // Preserve the resourceVersion if its an update request - bodyToApply.metadata.resourceVersion = resourceVersion; - // We had a conflict. Try a PUT - return apiEndpoint.put(bodyToApply, {}, cluster); - } -} - -// @todo: apiEndpoint.put has a type of any, which needs improving. - -export interface ApiError extends Error { - status: number; -} - -// @todo: is metrics() used anywhere? I can't find so, maybe in a plugin? - -/** - * Gets the metrics for the specified resource. Gets new metrics every 10 seconds. - * - * @param url - The url of the resource to get metrics for. - * @param onMetrics - The function to call with the metrics. - * @param onError - The function to call if there's an error. - * @param cluster - The cluster to get metrics for. By default uses the current cluster (URL defined). - * - * @returns A function to cancel the metrics request. - */ -export async function metrics( - url: string, - onMetrics: (arg: KubeMetrics[]) => void, - onError?: (err: ApiError) => void, - cluster?: string -) { - const handle = setInterval(getMetrics, 10000); - - const clusterName = cluster || getCluster(); - - async function getMetrics() { - try { - const metric = await clusterRequest(url, { cluster: clusterName }); - onMetrics(metric.items || metric); - } catch (err) { - if (isDebugVerbose('k8s/apiProxy@metrics')) { - console.debug('k8s/apiProxy@metrics', { err, url }); - } - - if (onError) { - onError(err as ApiError); - } - } - } - - function cancel() { - clearInterval(handle); - } - - getMetrics(); - - return cancel; -} - -//@todo: these need documenting. -//@todo: these need return types. - -export async function testAuth(cluster = '', namespace = 'default') { - const spec = { namespace }; - const clusterName = cluster || getCluster(); - - return post('/apis/authorization.k8s.io/v1/selfsubjectrulesreviews', { spec }, false, { - timeout: 5 * 1000, - cluster: clusterName, - }); -} - -export async function testClusterHealth(cluster?: string) { - const clusterName = cluster || getCluster() || ''; - return clusterRequest('/healthz', { isJSON: false, cluster: clusterName }); -} - -export async function setCluster(clusterReq: ClusterRequest) { - const kubeconfig = clusterReq.kubeconfig; - let requestURL = '/cluster'; - - if (kubeconfig) { - await storeStatelessClusterKubeconfig(kubeconfig); - // We just send parsed kubeconfig from the backend to the frontend. - requestURL = '/parseKubeConfig'; - } - - return request( - requestURL, - { - method: 'POST', - body: JSON.stringify(clusterReq), - headers: { - ...JSON_HEADERS, - ...getHeadlampAPIHeaders(), - }, - }, - false, - false - ); -} - -// @todo return type is configSlice Promise -// @todo: needs documenting. - -export async function deleteCluster(cluster: string) { - if (cluster) { - const kubeconfig = await findKubeconfigByClusterName(cluster); - if (kubeconfig !== null) { - await deleteClusterKubeconfig(cluster); - return window.location.reload(); - } - } - - return request( - `/cluster/${cluster}`, - { method: 'DELETE', headers: { ...getHeadlampAPIHeaders() } }, - false, - false - ); -} - -/** - * renameCluster sends call to backend to update a field in kubeconfig which - * is the custom name of the cluster used by the user. - * @param cluster - */ -export async function renameCluster(cluster: string, newClusterName: string, source: string) { - let stateless = false; - if (cluster) { - const kubeconfig = await findKubeconfigByClusterName(cluster); - if (kubeconfig !== null) { - stateless = true; - } - } - - return request( - `/cluster/${cluster}`, - { - method: 'PUT', - headers: { ...getHeadlampAPIHeaders() }, - body: JSON.stringify({ newClusterName, source, stateless }), - }, - false, - false - ); -} - -/** - * parseKubeConfig sends call to backend to parse kubeconfig and send back - * the parsed clusters and contexts. - * @param clusterReq - The cluster request object. - */ -export async function parseKubeConfig(clusterReq: ClusterRequest) { - const kubeconfig = clusterReq.kubeconfig; - - if (kubeconfig) { - return request( - '/parseKubeConfig', - { - method: 'POST', - body: JSON.stringify(clusterReq), - headers: { - ...JSON_HEADERS, - ...getHeadlampAPIHeaders(), - }, - }, - false, - false - ); - } - - return null; -} - -// @todo: Move startPortForward, stopPortForward, and getPortForwardStatus to a portForward.ts - -// @todo: the return type is missing for the following functions. -// See PortForwardState in PortForward.tsx - -/** - * Starts a portforward with the given details. - * - * @param cluster - The cluster to portforward for. - * @param namespace - The namespace to portforward for. - * @param podname - The pod to portforward for. - * @param containerPort - The container port to portforward for. - * @param service - The service to portforward for. - * @param serviceNamespace - The service namespace to portforward for. - * @param port - The port to portforward for. - * @param id - The id to portforward for. - * - * @returns The response from the API. - * @throws {Error} if the request fails. - */ -export function startPortForward( - cluster: string, - namespace: string, - podname: string, - containerPort: number | string, - service: string, - serviceNamespace: string, - port?: string, - address: string = '', - id: string = '' -) { - const headers = new Headers({ - ...JSON_HEADERS, - }); - - const token = getToken(cluster); - if (!!token) { - headers.set('Authorization', `Bearer ${token}`); - } - - return fetch(`${helpers.getAppUrl()}portforward`, { - method: 'POST', - headers, - body: JSON.stringify({ - cluster, - namespace, - pod: podname, - service, - targetPort: containerPort.toString(), - serviceNamespace, - id: id, - address, - port, - }), - }).then((response: Response) => { - return response.json().then(data => { - if (!response.ok) { - throw new Error(data.message); - } - return data; - }); - }); -} - -// @todo: stopOrDelete true is confusing, rename this param to justStop? -// @todo: needs a return type. -/** - * Stops or deletes a portforward with the specified details. - * - * @param cluster - The cluster to portforward for. - * @param id - The id to portforward for. - * @param stopOrDelete - Whether to stop or delete the portforward. True for stop, false for delete. - * - * @returns The response from the API. - * @throws {Error} if the request fails. - */ -export function stopOrDeletePortForward(cluster: string, id: string, stopOrDelete: boolean = true) { - return fetch(`${helpers.getAppUrl()}portforward`, { - method: 'DELETE', - body: JSON.stringify({ - cluster, - id, - stopOrDelete, - }), - }).then(response => - response.text().then(data => { - if (!response.ok) { - throw new Error('Error deleting port forward'); - } - return data; - }) - ); -} - -// @todo: needs a return type. - -/** - * Lists the port forwards for the specified cluster. - * - * @param cluster - The cluster to list the port forwards. - * - * @returns the list of port forwards for the cluster. - */ -export function listPortForward(cluster: string) { - return fetch(`${helpers.getAppUrl()}portforward/list?cluster=${cluster}`).then(response => - response.json() - ); -} - -// @todo: Move drainNode and drainNodeStatus to a drainNode.ts - -/** - * Drain a node - * - * @param cluster - The cluster to drain the node - * @param nodeName - The node name to drain - * - * @returns {Promise} - * @throws {Error} if the request fails - * @throws {Error} if the response is not ok - * - * This function is used to drain a node. It is used in the node detail page. - * As draining a node is a long running process, we get the request received - * message if the request is successful. And then we poll the drain node status endpoint - * to get the status of the drain node process. - */ -export function drainNode(cluster: string, nodeName: string) { - const headers = new Headers({ - ...JSON_HEADERS, - }); - - const token = getToken(cluster); - if (!!token) { - headers.set('Authorization', `Bearer ${token}`); - } - - return fetch(`${helpers.getAppUrl()}drain-node`, { - method: 'POST', - headers, - body: JSON.stringify({ - cluster, - nodeName, - }), - }).then(response => { - return response.json().then(data => { - if (!response.ok) { - throw new Error('Something went wrong'); - } - return data; - }); - }); -} - -// @todo: needs documenting. - -interface DrainNodeStatus { - id: string; //@todo: what is this and what is it for? - cluster: string; -} - -/** - * Get the status of the drain node process. - * - * It is used in the node detail page. - * As draining a node is a long running process, we poll this endpoint to get - * the status of the drain node process. - * - * @param cluster - The cluster to get the status of the drain node process for. - * @param nodeName - The node name to get the status of the drain node process for. - * - * @returns - The response from the API. @todo: what response? - * @throws {Error} if the request fails - * @throws {Error} if the response is not ok - */ -export function drainNodeStatus(cluster: string, nodeName: string): Promise { - const headers = new Headers({ - ...JSON_HEADERS, - }); - - const token = getToken(cluster); - if (!!token) { - headers.set('Authorization', `Bearer ${token}`); - } - - return fetch(`${helpers.getAppUrl()}drain-node-status?cluster=${cluster}&nodeName=${nodeName}`, { - method: 'GET', - headers, - }).then(response => { - return response.json().then((data: DrainNodeStatus) => { - if (!response.ok) { - throw new Error('Something went wrong'); - } - return data; - }); - }); -} - -/** - * getClusterDefaultNamespace gives the default namespace for the given cluster. - * - * If the checkSettings parameter is true (default), it will check the cluster settings first. - * Otherwise it will just check the cluster config. This means that if one needs the default - * namespace that may come from the kubeconfig, call this function with the checkSettings parameter as false. - * - * @param cluster The cluster name. - * @param checkSettings Whether to check the settings for the default namespace (otherwise it just checks the cluster config). Defaults to true. - * - * @returns The default namespace for the given cluster. - */ -function getClusterDefaultNamespace(cluster: string, checkSettings?: boolean): string { - const includeSettings = checkSettings ?? true; - let defaultNamespace = ''; - - if (!!cluster) { - if (includeSettings) { - const clusterSettings = helpers.loadClusterSettings(cluster); - defaultNamespace = clusterSettings?.defaultNamespace || ''; - } - - if (!defaultNamespace) { - const state = store.getState(); - const clusterDefaultNs: string = - state.config?.clusters?.[cluster]?.meta_data?.namespace || ''; - defaultNamespace = clusterDefaultNs; - } - } - - return defaultNamespace; -} - -// @todo: needs a return type. - -//@todo: what is DELETE /plugins/name response type? It's not used by headlamp in PLuginSettingsDetail. -/** - * Deletes the plugin with the specified name from the system. - * - * This function sends a DELETE request to the server's plugin management - * endpoint, targeting the plugin identified by its name. - * The function handles the request asynchronously and returns a promise that - * resolves with the server's response to the DELETE operation. - * - * @param {string} name - The unique name of the plugin to delete. - * This identifier is used to construct the URL for the DELETE request. - * - * @returns — A Promise that resolves to the JSON response from the API server. - * @throws — An ApiError if the response status is not ok. - * - * @example - * // Call to delete a plugin named 'examplePlugin' - * deletePlugin('examplePlugin') - * .then(response => console.log('Plugin deleted successfully', response)) - * .catch(error => console.error('Failed to delete plugin', error)); - */ -export async function deletePlugin(name: string) { - return request( - `/plugins/${name}`, - { method: 'DELETE', headers: { ...getHeadlampAPIHeaders() } }, - false, - false - ); -} +export type { QueryParameters } from './queryParameters'; + +// Basic cluster API functions +export { + clusterRequest, + patch, + post, + put, + remove, + request, + type ApiError, + type ClusterRequest, + type ClusterRequestParams, + type RequestParams, +} from './clusterRequests'; + +// Streaming API functions +export { + stream, + streamResult, + streamResults, + streamResultsForCluster, + type StreamArgs, + type StreamResultsParams, + type StreamResultsCb, + type StreamErrCb, +} from './streamingApi'; + +// API factory functions +export { + apiFactory, + apiFactoryWithNamespace, + type ApiInfo, + type ApiClient, + type ApiWithNamespaceClient, +} from './factories'; + +// Port forward functions +export { listPortForward, startPortForward, stopOrDeletePortForward } from './portForward'; + +export { + deleteCluster, + setCluster, + testAuth, + testClusterHealth, + parseKubeConfig, + renameCluster, +} from './clusterApi'; +export { metrics } from './metricsApi'; +export { deletePlugin } from './pluginsApi'; + +export { drainNodeStatus, drainNode } from './drainNode'; + +export { apply } from './apply'; diff --git a/frontend/src/lib/k8s/apiProxy/metricsApi.ts b/frontend/src/lib/k8s/apiProxy/metricsApi.ts new file mode 100644 index 0000000000..b20a900f61 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/metricsApi.ts @@ -0,0 +1,48 @@ +import { isDebugVerbose } from '../../../helpers'; +import { getCluster } from '../../cluster'; +import { KubeMetrics } from '../cluster'; +import { ApiError, clusterRequest } from './clusterRequests'; + +/** + * Gets the metrics for the specified resource. Gets new metrics every 10 seconds. + * + * @param url - The url of the resource to get metrics for. + * @param onMetrics - The function to call with the metrics. + * @param onError - The function to call if there's an error. + * @param cluster - The cluster to get metrics for. By default uses the current cluster (URL defined). + * + * @returns A function to cancel the metrics request. + */ +export async function metrics( + url: string, + onMetrics: (arg: KubeMetrics[]) => void, + onError?: (err: ApiError) => void, + cluster?: string +) { + const handle = setInterval(getMetrics, 10000); + + const clusterName = cluster || getCluster(); + + async function getMetrics() { + try { + const metric = await clusterRequest(url, { cluster: clusterName }); + onMetrics(metric.items || metric); + } catch (err) { + if (isDebugVerbose('k8s/apiProxy@metrics')) { + console.debug('k8s/apiProxy@metrics', { err, url }); + } + + if (onError) { + onError(err as ApiError); + } + } + } + + function cancel() { + clearInterval(handle); + } + + getMetrics(); + + return cancel; +} diff --git a/frontend/src/lib/k8s/apiProxy/pluginsApi.ts b/frontend/src/lib/k8s/apiProxy/pluginsApi.ts new file mode 100644 index 0000000000..08b6bc23a6 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/pluginsApi.ts @@ -0,0 +1,32 @@ +import { getHeadlampAPIHeaders } from '../../../helpers'; +import { request } from './clusterRequests'; + +//@todo: what is DELETE /plugins/name response type? It's not used by headlamp in PLuginSettingsDetail. +/** + * Deletes the plugin with the specified name from the system. + * + * This function sends a DELETE request to the server's plugin management + * endpoint, targeting the plugin identified by its name. + * The function handles the request asynchronously and returns a promise that + * resolves with the server's response to the DELETE operation. + * + * @param {string} name - The unique name of the plugin to delete. + * This identifier is used to construct the URL for the DELETE request. + * + * @returns — A Promise that resolves to the JSON response from the API server. + * @throws — An ApiError if the response status is not ok. + * + * @example + * // Call to delete a plugin named 'examplePlugin' + * deletePlugin('examplePlugin') + * .then(response => console.log('Plugin deleted successfully', response)) + * .catch(error => console.error('Failed to delete plugin', error)); + */ +export async function deletePlugin(name: string) { + return request( + `/plugins/${name}`, + { method: 'DELETE', headers: { ...getHeadlampAPIHeaders() } }, + false, + false + ); +} diff --git a/frontend/src/lib/k8s/apiProxy/portForward.ts b/frontend/src/lib/k8s/apiProxy/portForward.ts new file mode 100644 index 0000000000..2082cb1a2e --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/portForward.ts @@ -0,0 +1,133 @@ +import helpers from '../../../helpers'; +import { getToken } from '../../auth'; +import { JSON_HEADERS } from './constants'; + +// @todo: the return type is missing for the following functions. +// See PortForwardState in PortForward.tsx + +export interface PortForward { + id: string; + pod: string; + service: string; + serviceNamespace: string; + namespace: string; + cluster: string; + port: string; + targetPort: string; + status?: string; + error?: string; +} + +export interface PortForwardRequest { + id: string; + namespace: string; + pod: string; + service: string; + serviceNamespace: string; + targetPort: string; + cluster: string; + port?: string; + address?: string; +} + +/** + * Starts a portforward with the given details. + * + * @param cluster - The cluster to portforward for. + * @param namespace - The namespace to portforward for. + * @param podname - The pod to portforward for. + * @param containerPort - The container port to portforward for. + * @param service - The service to portforward for. + * @param serviceNamespace - The service namespace to portforward for. + * @param port - The port to portforward for. + * @param id - The id to portforward for. + * + * @returns The response from the API. + * @throws {Error} if the request fails. + */ +export function startPortForward( + cluster: string, + namespace: string, + podname: string, + containerPort: number | string, + service: string, + serviceNamespace: string, + port?: string, + address: string = '', + id: string = '' +): Promise { + const request: PortForwardRequest = { + cluster, + namespace, + pod: podname, + service, + targetPort: containerPort.toString(), + serviceNamespace, + id: id, + address, + port, + }; + return fetch(`${helpers.getAppUrl()}portforward`, { + method: 'POST', + headers: new Headers({ + Authorization: `Bearer ${getToken(cluster)}`, + ...JSON_HEADERS, + }), + body: JSON.stringify(request), + }).then((response: Response) => { + return response.json().then(data => { + if (!response.ok) { + throw new Error(data.message); + } + return data; + }); + }); +} + +// @todo: stopOrDelete true is confusing, rename this param to justStop? +/** + * Stops or deletes a portforward with the specified details. + * + * @param cluster - The cluster to portforward for. + * @param id - The id to portforward for. + * @param stopOrDelete - Whether to stop or delete the portforward. True for stop, false for delete. + * + * @returns The response from the API. + * @throws {Error} if the request fails. + */ +export function stopOrDeletePortForward( + cluster: string, + id: string, + stopOrDelete: boolean = true +): Promise { + return fetch(`${helpers.getAppUrl()}portforward`, { + method: 'DELETE', + body: JSON.stringify({ + cluster, + id, + stopOrDelete, + }), + }).then(response => + response.text().then(data => { + if (!response.ok) { + throw new Error('Error deleting port forward'); + } + return data; + }) + ); +} + +// @todo: needs a return type. + +/** + * Lists the port forwards for the specified cluster. + * + * @param cluster - The cluster to list the port forwards. + * + * @returns the list of port forwards for the cluster. + */ +export function listPortForward(cluster: string): Promise { + return fetch(`${helpers.getAppUrl()}portforward/list?cluster=${cluster}`).then(response => + response.json() + ); +} diff --git a/frontend/src/lib/k8s/apiProxy/queryParameters.ts b/frontend/src/lib/k8s/apiProxy/queryParameters.ts new file mode 100644 index 0000000000..a04016a2bb --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/queryParameters.ts @@ -0,0 +1,111 @@ +// @todo: QueryParamaters should be specific to different resources. +// Because some only support some paramaters. + +/** + * QueryParamaters is a map of query parameters for the Kubernetes API. + */ +export interface QueryParameters { + /** + * Continue token for paging through large result sets. + * + * The continue option should be set when retrieving more results from the server. + * Since this value is server defined, clients may only use the continue value + * from a previous query result with identical query parameters + * (except for the value of continue) and the server may reject a continue value + * it does not recognize. If the specified continue value is no longer valid + * whether due to expiration (generally five to fifteen minutes) or a + * configuration change on the server, the server will respond with a + * 410 ResourceExpired error together with a continue token. If the client + * needs a consistent list, it must restart their list without the continue field. + * Otherwise, the client may send another list request with the token received + * with the 410 error, the server will respond with a list starting from the next + * key, but from the latest snapshot, which is inconsistent from the previous + * list results - objects that are created, modified, or deleted after the first + * list request will be included in the response, as long as their keys are after + * the "next key". + * + * This field is not supported when watch is true. Clients may start a watch from + * the last resourceVersion value returned by the server and not miss any modifications. + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#retrieving-large-results-sets-in-chunks + */ + continue?: string; + /** + * dryRun causes apiserver to simulate the request, and report whether the object would be modified. + * Can be '' or 'All' + * + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#dry-run + */ + dryRun?: string; + /** + * fieldSeletor restricts the list of returned objects by their fields. Defaults to everything. + * + * @see https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/ + */ + fieldSelector?: string; + /** + * labelSelector restricts the list of returned objects by their labels. Defaults to everything. + * + * @see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api + * @see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors + */ + labelSelector?: string; + /** + * limit is a maximum number of responses to return for a list call. + * + * If more items exist, the server will set the continue field on the list + * metadata to a value that can be used with the same initial query to retrieve + * the next set of results. Setting a limit may return fewer than the requested + * amount of items (up to zero items) in the event all requested objects are + * filtered out and clients should only use the presence of the continue field + * to determine whether more results are available. Servers may choose not to + * support the limit argument and will return all of the available results. + * If limit is specified and the continue field is empty, clients may assume + * that no more results are available. + * + * This field is not supported if watch is true. + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#retrieving-large-results-sets-in-chunks + */ + limit?: string | number; + /** + * resourceVersion sets a constraint on what resource versions a request may be served from. + * Defaults to unset + * + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions + */ + resourceVersion?: string; + /** + * allowWatchBookmarks means watch events with type "BOOKMARK" will also be sent. + * + * Can be 'true' + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks + */ + allowWatchBookmarks?: string; + /** + * sendInitialEvents controls whether the server will send the events + * for a watch before sending the current list state. + * + * Can be 'true'. + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists + */ + sendInitialEvents?: string; + /** + * The resource version to match. + * + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + */ + resourceVersionMatch?: string; + /** + * If 'true', then the output is pretty printed. + * Can be '' or 'true' + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#output-options + */ + pretty?: string; + /** + * watch instead of a list or get, watch for changes to the requested object(s). + * + * Can be 1. + * @see https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes + */ + watch?: string; +} diff --git a/frontend/src/lib/k8s/apiProxy/scaleApi.ts b/frontend/src/lib/k8s/apiProxy/scaleApi.ts new file mode 100644 index 0000000000..ac6f87d3b3 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/scaleApi.ts @@ -0,0 +1,54 @@ +import { getCluster } from '../../cluster'; +import { KubeMetadata } from '../cluster'; +import { clusterRequest, patch, put } from './clusterRequests'; + +export interface ScaleApi { + get: (namespace: string, name: string, clusterName?: string) => Promise; + put: ( + body: { + metadata: KubeMetadata; + spec: { + replicas: number; + }; + }, + clusterName?: string + ) => Promise; + patch: ( + body: { + spec: { + replicas: number; + }; + }, + metadata: KubeMetadata, + clusterName?: string + ) => Promise; +} + +export function apiScaleFactory(apiRoot: string, resource: string): ScaleApi { + return { + get: (namespace: string, name: string, clusterName?: string) => { + const cluster = clusterName || getCluster() || ''; + return clusterRequest(url(namespace, name), { cluster }); + }, + put: (body: { metadata: KubeMetadata; spec: { replicas: number } }, clusterName?: string) => { + const cluster = clusterName || getCluster() || ''; + return put(url(body.metadata.namespace!, body.metadata.name), body, undefined, { cluster }); + }, + patch: ( + body: { + spec: { + replicas: number; + }; + }, + metadata: KubeMetadata, + clusterName?: string + ) => { + const cluster = clusterName || getCluster() || ''; + return patch(url(metadata.namespace!, metadata.name), body, false, { cluster }); + }, + }; + + function url(namespace: string, name: string) { + return `${apiRoot}/namespaces/${namespace}/${resource}/${name}/scale`; + } +} diff --git a/frontend/src/lib/k8s/apiProxy/streamingApi.ts b/frontend/src/lib/k8s/apiProxy/streamingApi.ts new file mode 100644 index 0000000000..25b72a5f19 --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/streamingApi.ts @@ -0,0 +1,480 @@ +import { isDebugVerbose } from '../../../helpers'; +import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../stateless'; +import { getToken } from '../../auth'; +import { getCluster } from '../../cluster'; +import { KubeObjectInterface } from '../cluster'; +import { ApiError, clusterRequest } from './clusterRequests'; +import { BASE_HTTP_URL, CLUSTERS_PREFIX } from './constants'; +import { asQuery, combinePath } from './formatUrl'; +import { QueryParameters } from './queryParameters'; + +export type StreamUpdate = { + type: 'ADDED' | 'MODIFIED' | 'DELETED' | 'ERROR'; + object: T; +}; + +export type StreamResultsCb = (data: T) => void; +export type StreamUpdatesCb = (data: T | StreamUpdate) => void; +export type StreamErrCb = (err: Error & { status?: number }, cancelStreamFunc?: () => void) => void; + +const BASE_WS_URL = BASE_HTTP_URL.replace('http', 'ws'); + +/** + * Fetches the data and watches for changes to the data. + * + * @param url - The URL of the Kubernetes API endpoint. + * @param name - The name of the Kubernetes API resource. + * @param cb - The callback function to execute when the stream receives data. + * @param errCb - The callback function to execute when an error occurs. + * @param queryParams - The query parameters to include in the API request. + * + * @returns A function to cancel the stream. + */ +export function streamResult( + url: string, + name: string, + cb: StreamResultsCb, + errCb: StreamErrCb, + queryParams?: QueryParameters, + cluster?: string +) { + let isCancelled = false; + let socket: ReturnType; + const clusterName = cluster || getCluster() || ''; + + if (isDebugVerbose('k8s/apiProxy@streamResult')) { + console.debug('k8s/apiProxy@streamResult', { url, name, queryParams }); + } + + run(); + + return Promise.resolve(cancel); + + async function run() { + try { + const item = await clusterRequest(`${url}/${name}` + asQuery(queryParams), { + cluster: clusterName, + }); + + if (isCancelled) return; + + if (isDebugVerbose('k8s/apiProxy@streamResult run cb(item)')) { + console.debug('k8s/apiProxy@streamResult run cb(item)', { item }); + } + + cb(item); + + const watchUrl = + url + + asQuery({ ...queryParams, ...{ watch: '1', fieldSelector: `metadata.name=${name}` } }); + + socket = stream(watchUrl, (x: any) => cb(x.object), { isJson: true, cluster: clusterName }); + } catch (err) { + console.error('Error in api request', { err, url }); + // @todo: sometimes errCb is {}, the typing for apiProxy needs improving. + // See https://github.com/kinvolk/headlamp/pull/833 + if (errCb && typeof errCb === 'function') errCb(err as ApiError, cancel); + } + } + + function cancel() { + if (isCancelled) return; + isCancelled = true; + + if (socket) socket.cancel(); + } +} + +/** + * Streams the results of a Kubernetes API request. + * + * @param url - The URL of the Kubernetes API endpoint. + * @param cb - The callback function to execute when the stream receives data. + * @param errCb - The callback function to execute when an error occurs. + * @param queryParams - The query parameters to include in the API request. + * + * @returns A function to cancel the stream. + */ +export function streamResults( + url: string, + cb: StreamResultsCb, + errCb: StreamErrCb, + queryParams: QueryParameters | undefined +) { + const cluster = getCluster() || ''; + return streamResultsForCluster(url, { cb, errCb, cluster }, queryParams); +} + +// @todo: this interface needs documenting. + +export interface StreamResultsParams { + cb: StreamResultsCb; + errCb: StreamErrCb; + cluster?: string; +} + +// @todo: needs documenting + +export function streamResultsForCluster( + url: string, + params: StreamResultsParams, + queryParams?: QueryParameters +): Promise<() => void> { + const { cb, errCb, cluster = '' } = params; + const clusterName = cluster || getCluster() || ''; + + const results: Record = {}; + let isCancelled = false; + let socket: ReturnType; + + if (isDebugVerbose('k8s/apiProxy@streamResults')) { + console.debug('k8s/apiProxy@streamResults', { url, queryParams }); + } + + // -1 means unlimited. + const maxResources = + typeof queryParams?.limit === 'number' + ? queryParams.limit + : parseInt(queryParams?.limit ?? '-1'); + + run(); + + return Promise.resolve(cancel); + + async function run() { + try { + const { kind, items, metadata } = await clusterRequest(url + asQuery(queryParams), { + cluster: clusterName, + }); + + if (isCancelled) return; + + add(items, kind); + + const watchUrl = + url + + asQuery({ ...queryParams, ...{ watch: '1', resourceVersion: metadata.resourceVersion } }); + socket = stream(watchUrl, update, { isJson: true, cluster: clusterName }); + } catch (err) { + console.error('Error in api request', { err, url }); + if (errCb && typeof errCb === 'function') { + errCb(err as ApiError, cancel); + } + } + } + + function cancel() { + if (isCancelled) return; + isCancelled = true; + + if (socket) socket.cancel(); + } + + function add(items: any[], kind: string) { + const fixedKind = kind.slice(0, -4); // Trim off the word "List" from the end of the string + for (const item of items) { + item.kind = fixedKind; + results[item.metadata.uid] = item; + } + + push(); + } + + function update({ type, object }: StreamUpdate) { + (object as KubeObjectInterface).actionType = type; // eslint-disable-line no-param-reassign + + switch (type) { + case 'ADDED': + results[object.metadata.uid] = object; + break; + case 'MODIFIED': { + const existing = results[object.metadata.uid]; + + if (existing) { + if (!existing.metadata.resourceVersion || !object.metadata.resourceVersion) { + console.error('Missing resourceVersion in object', object); + break; + } + const currentVersion = parseInt(existing.metadata.resourceVersion, 10); + const newVersion = parseInt(object.metadata.resourceVersion, 10); + if (currentVersion < newVersion) { + Object.assign(existing, object); + } + } else { + results[object.metadata.uid] = object; + } + + break; + } + case 'DELETED': + delete results[object.metadata.uid]; + break; + case 'ERROR': + console.error('Error in update', { type, object }); + break; + default: + console.error('Unknown update type', type); + } + + push(); + } + + function push() { + const values = Object.values(results); + // Limit the number of resources to maxResources. We do this because when we're streaming, the + // API server will send us all the resources that match the query, without limitting, even if the + // API params wanted to limit it. So we do the limitting here. + if (maxResources > 0 && values.length > maxResources) { + values.sort((a, b) => { + const aTime = new Date(a.lastTimestamp || a.metadata.creationTimestamp!).getTime(); + const bTime = new Date(b.lastTimestamp || b.metadata.creationTimestamp!).getTime(); + // Reverse sort, so we have the most recent resources at the beginning of the array. + return 0 - (aTime - bTime); + }); + values.splice(0, values.length - maxResources); + } + + if (isDebugVerbose('k8s/apiProxy@push cb(values)')) { + console.debug('k8s/apiProxy@push cb(values)', { values }); + } + cb(values); + } +} + +/** + * Configure a stream with... StreamArgs. + */ +export interface StreamArgs { + /** Whether the stream is expected to receive JSON data. */ + isJson?: boolean; + /** Additional WebSocket protocols to use when connecting. */ + additionalProtocols?: string[]; + /** A callback function to execute when the WebSocket connection is established. */ + connectCb?: () => void; + /** Whether to attempt to reconnect the WebSocket connection if it fails. */ + reconnectOnFailure?: boolean; + /** A callback function to execute when the WebSocket connection fails. */ + failCb?: () => void; + tty?: boolean; + stdin?: boolean; + stdout?: boolean; + stderr?: boolean; + cluster?: string; +} + +/** + * Establishes a WebSocket connection to the specified URL and streams the results + * to the provided callback function. + * + * @param url - The URL to connect to. + * @param cb - The callback function to receive the streamed results. + * @param args - Additional arguments to configure the stream. + * + * @returns An object with two functions: `cancel`, which can be called to cancel + * the stream, and `getSocket`, which returns the WebSocket object. + */ +export function stream(url: string, cb: StreamResultsCb, args: StreamArgs) { + let connection: { close: () => void; socket: WebSocket | null } | null = null; + let isCancelled = false; + const { failCb, cluster = '' } = args; + // We only set reconnectOnFailure as true by default if the failCb has not been provided. + const { isJson = false, additionalProtocols, connectCb, reconnectOnFailure = !failCb } = args; + + if (isDebugVerbose('k8s/apiProxy@stream')) { + console.debug('k8s/apiProxy@stream', { url, args }); + } + + connect(); + + return { cancel, getSocket }; + + function getSocket() { + return connection ? connection.socket : null; + } + + function cancel() { + if (connection) connection.close(); + isCancelled = true; + } + + async function connect() { + if (connectCb) connectCb(); + try { + connection = await connectStream(url, cb, onFail, isJson, additionalProtocols, cluster); + } catch (error) { + console.error('Error connecting stream:', error); + onFail(); + } + } + + function retryOnFail() { + if (isCancelled) return; + + if (reconnectOnFailure) { + if (isDebugVerbose('k8s/apiProxy@stream retryOnFail')) { + console.debug('k8s/apiProxy@stream retryOnFail', 'Reconnecting in 3 seconds', { url }); + } + + setTimeout(connect, 3000); + } + } + + function onFail() { + if (!!failCb) { + failCb(); + } + + if (reconnectOnFailure) { + retryOnFail(); + } + } +} + +// @todo: needs a return type. + +/** + * Connects to a WebSocket stream at the specified path and returns an object + * with a `close` function and a `socket` property. Sends messages to `cb` callback. + * + * @param path - The path of the WebSocket stream to connect to. + * @param cb - The function to call with each message received from the stream. + * @param onFail - The function to call if the stream is closed unexpectedly. + * @param isJson - Whether the messages should be parsed as JSON. + * @param additionalProtocols - An optional array of additional WebSocket protocols to use. + * + * @returns An object with a `close` function and a `socket` property. + */ +export async function connectStream( + path: string, + cb: StreamResultsCb, + onFail: () => void, + isJson: boolean, + additionalProtocols: string[] = [], + cluster = '' +) { + return connectStreamWithParams(path, cb, onFail, { + isJson, + cluster: cluster || getCluster() || '', + additionalProtocols, + }); +} + +// @todo: needs documenting. + +interface StreamParams { + cluster?: string; + isJson?: boolean; + additionalProtocols?: string[]; +} + +/** + * connectStreamWithParams is a wrapper around connectStream that allows for more + * flexibility in the parameters that can be passed to the WebSocket connection. + * + * This is an async function because it may need to fetch the kubeconfig for the + * cluster if the cluster is specified in the params. If kubeconfig is found, it + * sends the X-HEADLAMP-USER-ID header with the user ID from the localStorage. + * It is sent as a base64url encoded string in protocal format: + * `base64url.headlamp.authorization.k8s.io.${userID}`. + * + * @param path - The path of the WebSocket stream to connect to. + * @param cb - The function to call with each message received from the stream. + * @param onFail - The function to call if the stream is closed unexpectedly. + * @param params - Stream parameters to configure the connection. + * + * @returns A promise that resolves to an object with a `close` function and a `socket` property. + */ +export async function connectStreamWithParams( + path: string, + cb: StreamResultsCb, + onFail: () => void, + params?: StreamParams +): Promise<{ + close: () => void; + socket: WebSocket | null; +}> { + const { isJson = false, additionalProtocols = [], cluster = '' } = params || {}; + let isClosing = false; + + const token = getToken(cluster || ''); + const userID = getUserIdFromLocalStorage(); + + const protocols = ['base64.binary.k8s.io', ...additionalProtocols]; + if (token) { + const encodedToken = btoa(token).replace(/=/g, ''); + protocols.push(`base64url.bearer.authorization.k8s.io.${encodedToken}`); + } + + let fullPath = path; + let url = ''; + if (cluster) { + fullPath = combinePath(`/${CLUSTERS_PREFIX}/${cluster}`, path); + try { + const kubeconfig = await findKubeconfigByClusterName(cluster); + + if (kubeconfig !== null) { + protocols.push(`base64url.headlamp.authorization.k8s.io.${userID}`); + } + + url = combinePath(BASE_WS_URL, fullPath); + } catch (error) { + console.error('Error while finding kubeconfig:', error); + // If we can't find the kubeconfig, we'll just use the base URL. + url = combinePath(BASE_WS_URL, fullPath); + } + } + + let socket: WebSocket | null = null; + try { + socket = new WebSocket(url, protocols); + socket.binaryType = 'arraybuffer'; + socket.addEventListener('message', onMessage); + socket.addEventListener('close', onClose); + socket.addEventListener('error', onError); + } catch (error) { + console.error(error); + } + + return { close, socket }; + + function close() { + isClosing = true; + if (!socket) { + return; + } + + socket.close(); + } + + function onMessage(body: MessageEvent) { + if (isClosing) return; + + const item = isJson ? JSON.parse(body.data) : body.data; + if (isDebugVerbose('k8s/apiProxy@connectStream onMessage cb(item)')) { + console.debug('k8s/apiProxy@connectStream onMessage cb(item)', { item }); + } + + cb(item); + } + + function onClose(...args: any[]) { + if (isClosing) return; + isClosing = true; + if (!socket) { + return; + } + + if (socket) { + socket.removeEventListener('message', onMessage); + socket.removeEventListener('close', onClose); + socket.removeEventListener('error', onError); + } + + console.warn('Socket closed unexpectedly', { path, args }); + onFail(); + } + + function onError(err: any) { + console.error('Error in api stream', { err, path }); + } +} diff --git a/frontend/src/lib/k8s/apiProxy/tokenApi.ts b/frontend/src/lib/k8s/apiProxy/tokenApi.ts new file mode 100644 index 0000000000..9fba69d93d --- /dev/null +++ b/frontend/src/lib/k8s/apiProxy/tokenApi.ts @@ -0,0 +1,107 @@ +import { decodeToken } from 'react-jwt'; +import { isDebugVerbose } from '../../../helpers'; +import { getToken, setToken } from '../../auth'; +import { getCluster } from '../../cluster'; +import { KubeToken } from '../token'; +import { + BASE_HTTP_URL, + CLUSTERS_PREFIX, + JSON_HEADERS, + MIN_LIFESPAN_FOR_TOKEN_REFRESH, +} from './constants'; +import { combinePath } from './formatUrl'; + +let isTokenRefreshInProgress = false; + +/** + * Refreshes the token if it is about to expire. + * + * @param token - The token to refresh. For null token it just does nothing. + * + * @note Sets the token with `setToken` if the token is refreshed. + * @note Uses global `isTokenRefreshInProgress` to prevent multiple token + * refreshes at the same time. + */ +export async function refreshToken(token: string | null): Promise { + if (!token || isTokenRefreshInProgress) { + return; + } + // decode token + const decodedToken: any = decodeToken(token); + + // return if the token doesn't have an expiry time + if (!decodedToken?.exp) { + return; + } + // convert expiry seconds to date object + const expiry = decodedToken.exp; + const now = new Date().valueOf(); + const expDate = new Date(0); + expDate.setUTCSeconds(expiry); + + // calculate time to expiry in seconds + const diff = (expDate.valueOf() - now) / 1000; + // If the token is not about to expire return + // comparing the time to expiry with the minimum lifespan for a token both in seconds + if (diff > MIN_LIFESPAN_FOR_TOKEN_REFRESH) { + return; + } + const namespace = + (decodedToken && decodedToken['kubernetes.io'] && decodedToken['kubernetes.io']['namespace']) || + ''; + const serviceAccountName = + (decodedToken && + decodedToken['kubernetes.io'] && + decodedToken['kubernetes.io']['serviceaccount'] && + decodedToken['kubernetes.io']['serviceaccount']['name']) || + {}; + const cluster = getCluster(); + if (!cluster || namespace === '' || serviceAccountName === '') { + return; + } + + if (isDebugVerbose('k8s/apiProxy@refreshToken')) { + console.debug('k8s/apiProxy@refreshToken', 'Refreshing token'); + } + + isTokenRefreshInProgress = true; + + let tokenUrl = combinePath(BASE_HTTP_URL, `/${CLUSTERS_PREFIX}/${cluster}`); + tokenUrl = combinePath( + tokenUrl, + `api/v1/namespaces/${namespace}/serviceaccounts/${serviceAccountName}/token` + ); + const tokenData = { + kind: 'TokenRequest', + apiVersion: 'authentication.k8s.io/v1', + metadata: { creationTimestamp: null }, + spec: { expirationSeconds: 86400 }, + }; + + try { + const headers = new Headers({ + ...JSON_HEADERS, + }); + + const token = getToken(cluster); + if (!!token) { + headers.set('Authorization', `Bearer ${token}`); + } + + const response = await fetch(tokenUrl, { + method: 'POST', + headers, + body: JSON.stringify(tokenData), + }); + + if (response.status === 201) { + const token: KubeToken = await response.json(); + setToken(cluster, token.status.token); + } + + isTokenRefreshInProgress = false; + } catch (err) { + console.error('Error refreshing token', err); + isTokenRefreshInProgress = false; + } +} diff --git a/frontend/src/lib/k8s/cluster.ts b/frontend/src/lib/k8s/cluster.ts index 67d698273d..45bfa0f7bb 100644 --- a/frontend/src/lib/k8s/cluster.ts +++ b/frontend/src/lib/k8s/cluster.ts @@ -706,6 +706,7 @@ export function makeKubeObject( args.unshift(this.getNamespace()!); } + // @ts-ignore return this._class().apiEndpoint.delete(...args, {}, this._clusterName); } @@ -748,13 +749,14 @@ export function makeKubeObject( patch(body: OpPatch[]) { const patchMethod = this._class().apiEndpoint.patch; - const args: Parameters = [body]; + const args: Partial> = [body]; if (this.isNamespaced) { args.push(this.getNamespace()); } args.push(this.getName()); + // @ts-ignore return this._class().apiEndpoint.patch(...args, {}, this._clusterName); } diff --git a/frontend/src/lib/k8s/clusterRole.ts b/frontend/src/lib/k8s/clusterRole.ts index 72adcbea95..b49b5a272a 100644 --- a/frontend/src/lib/k8s/clusterRole.ts +++ b/frontend/src/lib/k8s/clusterRole.ts @@ -1,7 +1,8 @@ import { apiFactory } from './apiProxy'; -import Role from './role'; +import { makeKubeObject } from './cluster'; +import { KubeRole } from './role'; -class ClusterRole extends Role { +class ClusterRole extends makeKubeObject('role') { static apiEndpoint = apiFactory('rbac.authorization.k8s.io', 'v1', 'clusterroles'); static get className() { @@ -11,6 +12,10 @@ class ClusterRole extends Role { get detailsRoute() { return 'clusterRole'; } + + get rules() { + return this.jsonData!.rules; + } } export default ClusterRole; diff --git a/frontend/src/lib/k8s/clusterRoleBinding.ts b/frontend/src/lib/k8s/clusterRoleBinding.ts index cb723e7270..354c08fa3c 100644 --- a/frontend/src/lib/k8s/clusterRoleBinding.ts +++ b/frontend/src/lib/k8s/clusterRoleBinding.ts @@ -1,7 +1,8 @@ import { apiFactory } from './apiProxy'; -import RoleBinding from './roleBinding'; +import { makeKubeObject } from './cluster'; +import { KubeRoleBinding } from './roleBinding'; -class ClusterRoleBinding extends RoleBinding { +class ClusterRoleBinding extends makeKubeObject('roleBinding') { static apiEndpoint = apiFactory('rbac.authorization.k8s.io', 'v1', 'clusterrolebindings'); static get className(): string { @@ -11,6 +12,14 @@ class ClusterRoleBinding extends RoleBinding { get detailsRoute() { return 'clusterRoleBinding'; } + + get roleRef() { + return this.jsonData!.roleRef; + } + + get subjects(): KubeRoleBinding['subjects'] { + return this.jsonData!.subjects; + } } export default ClusterRoleBinding;