Skip to content

Commit

Permalink
feat: add support for skynet instances
Browse files Browse the repository at this point in the history
  • Loading branch information
quitrk committed Oct 24, 2023
1 parent 29a8b5c commit b7858a9
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 34 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

### High Level

An autoscaler for Jitsi instances (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`), which are deployed in one of the following ways:
An autoscaler for Jitsi instances (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`, `skynet`), which are deployed in one of the following ways:
* as a parameterized Nomad batch job
* as an Instance in Oracle Cloud
* as a Droplet in Digital Ocean
* custom deployment model

The autoscaler manages multiple `groups` of instances, each having a `type` (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`) and being deployed in a specific `cloud` (`oracle`, `digitalocean`, `custom`).
The autoscaler manages multiple `groups` of instances, each having a `type` (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`, `skynet`) and being deployed in a specific `cloud` (`oracle`, `digitalocean`, `custom`).

The autoscaler knows the Jitsi instances status and communicates with them via the [jitsi-autoscaler-sidecar](https://github.com/jitsi/jitsi-autoscaler-sidecar),
which needs to be co-located on each Jitsi instance. The sidecar periodically checks in with the autoscaler via a REST call and sends its status.
Expand Down
8 changes: 5 additions & 3 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const lockManager: LockManager = new LockManager(logger, {
});

const instanceGroupManager = new InstanceGroupManager({
audit,
redisClient,
redisScanCount: config.RedisScanCount,
initialGroupList: config.GroupList,
Expand Down Expand Up @@ -216,6 +217,7 @@ const jobManager = new JobManager({
metricsLoop,
autoscalerProcessingTimeoutMs: config.GroupProcessingTimeoutMs,
launcherProcessingTimeoutMs: config.GroupProcessingTimeoutMs,
reportProcessingTimeoutMs: config.GroupProcessingTimeoutMs,
sanityLoopProcessingTimeoutMs: config.SanityProcessingTimoutMs,
});

Expand Down Expand Up @@ -556,13 +558,13 @@ app.put(
body('options.scaleUpPeriodsCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'),
body('options.scaleDownPeriodsCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'),
body('instanceType').custom(async (value) => {
if (!(await validator.supportedInstanceType(value))) {
throw new Error('Instance type not supported. Use jvb, jigasi, nomad, jibri or sip-jibri instead');
if (!validator.supportedInstanceType(value)) {
throw new Error('Instance type not supported. Use jvb, jigasi, nomad, jibri, sip-jibri or skynet instead');
}
return true;
}),
body('direction').custom(async (value) => {
if (!(await validator.supportedScalingDirection(value))) {
if (!validator.supportedScalingDirection(value)) {
throw new Error('Scaling direction not supported. Use up or down instead');
}
return true;
Expand Down
18 changes: 18 additions & 0 deletions src/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface GroupAudit {
timestamp?: number | string;
autoScalerActionItem?: AutoScalerActionItem;
launcherActionItem?: LauncherActionItem;
groupMetricValue?: number;
}

export interface AutoScalerActionItem {
Expand All @@ -37,6 +38,7 @@ export interface LauncherActionItem {
export interface GroupAuditResponse {
lastLauncherRun: string;
lastAutoScalerRun: string;
lastGroupMetricValue: number;
lastReconfigureRequest: string;
lastScaleMetrics?: Array<number>;
autoScalerActionItems?: AutoScalerActionItem[];
Expand Down Expand Up @@ -222,6 +224,18 @@ export default class Audit {
return updateResponse;
}

async updateLastGroupMetricValue(ctx: Context, groupName: string, groupMetricValue: number): Promise<boolean> {
const value: GroupAudit = {
groupMetricValue,
groupName,
type: 'last-group-metric-value',
};
const updateResponse = this.setGroupValue(groupName, value);
ctx.logger.info(`Updated last group metric for group ${groupName}`);

return updateResponse;
}

async updateLastAutoScalerRun(ctx: Context, groupName: string, scaleMetrics: Array<number>): Promise<boolean> {
const updateLastAutoScalerStart = process.hrtime();

Expand Down Expand Up @@ -345,6 +359,7 @@ export default class Audit {
const groupAuditResponse: GroupAuditResponse = {
lastLauncherRun: 'unknown',
lastAutoScalerRun: 'unknown',
lastGroupMetricValue: null,
lastReconfigureRequest: 'unknown',
lastScaleMetrics: [],
};
Expand All @@ -356,6 +371,9 @@ export default class Audit {
case 'last-launcher-run':
groupAuditResponse.lastLauncherRun = new Date(groupAudit.timestamp).toISOString();
break;
case 'last-group-metric-value':
groupAuditResponse.lastGroupMetricValue = groupAudit.groupMetricValue;
break;
case 'last-autoScaler-run':
groupAuditResponse.lastScaleMetrics = groupAudit.autoScalerActionItem
? groupAudit.autoScalerActionItem.scaleMetrics
Expand Down
21 changes: 17 additions & 4 deletions src/autoscaler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { InstanceMetric, InstanceTracker } from './instance_tracker';
import CloudManager from './cloud_manager';
import Redlock from 'redlock';
import Redis from 'ioredis';
import InstanceGroupManager, { InstanceGroup } from './instance_group';
import InstanceGroupManager, { GroupMetric, InstanceGroup } from './instance_group';
import LockManager from './lock_manager';
import { Context } from './context';
import Audit from './audit';
Expand Down Expand Up @@ -75,13 +75,24 @@ export default class AutoscaleProcessor {
group.scalingOptions.scaleUpPeriodsCount,
group.scalingOptions.scaleDownPeriodsCount,
);
const metricInventoryPerPeriod: Array<Array<InstanceMetric>> =
await this.instanceTracker.getMetricInventoryPerPeriod(

let metricInventoryPerPeriod = [];

if (group.metricsUrl) {
metricInventoryPerPeriod = await this.instanceGroupManager.getGroupMetricInventoryPerPeriod(
ctx,
group.name,
maxPeriodCount,
group.scalingOptions.scalePeriod,
);
} else {
metricInventoryPerPeriod = await this.instanceTracker.getMetricInventoryPerPeriod(
ctx,
group.name,
maxPeriodCount,
group.scalingOptions.scalePeriod,
);
}

const scaleMetrics = await this.updateDesiredCountIfNeeded(ctx, group, count, metricInventoryPerPeriod);
await this.audit.updateLastAutoScalerRun(ctx, group.name, scaleMetrics);
Expand All @@ -96,7 +107,7 @@ export default class AutoscaleProcessor {
ctx: Context,
group: InstanceGroup,
count: number,
metricInventoryPerPeriod: Array<Array<InstanceMetric>>,
metricInventoryPerPeriod: Array<Array<InstanceMetric | GroupMetric>>,
): Promise<Array<number>> {
ctx.logger.debug(
`[AutoScaler] Begin desired count adjustments for group ${group.name} with ${count} instances and current desired count ${group.scalingOptions.desiredCount}`,
Expand Down Expand Up @@ -191,6 +202,7 @@ export default class AutoscaleProcessor {
case 'jigasi':
case 'nomad':
case 'JVB':
case 'skynet':
// in the case of JVB scale up only if value (average stress level) is above or equal to threshhold
return (
(count < group.scalingOptions.maxDesired && value >= group.scalingOptions.scaleUpThreshold) ||
Expand All @@ -209,6 +221,7 @@ export default class AutoscaleProcessor {
case 'jigasi':
case 'nomad':
case 'JVB':
case 'skynet':
// in the case of JVB scale down only if value (average stress level) is below threshhold
return count > group.scalingOptions.minDesired && value < group.scalingOptions.scaleDownThreshold;
}
Expand Down
2 changes: 1 addition & 1 deletion src/group_report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export default class GroupReportGenerator {
instanceReport.scaleStatus = 'IN USE';
}
if (
instanceState.status.jigasiStatus &&
instanceState.status.nomadStatus &&
!instanceState.status.nomadStatus.eligibleForScheduling
) {
instanceReport.scaleStatus = 'GRACEFUL SHUTDOWN';
Expand Down
85 changes: 85 additions & 0 deletions src/instance_group.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Redis from 'ioredis';
import { Context } from './context';
import got from 'got';
import Audit from './audit';

export interface ScalingOptions {
minDesired: number;
Expand All @@ -18,6 +20,16 @@ export interface InstanceGroupTags {
[id: string]: string;
}

export interface GroupMetric {
groupName: string;
timestamp: number;
value: number;
}

export const GroupTypeToGroupMetricKey: { [id: string]: string } = {
skynet: 'queueSize',
};

export interface InstanceGroup {
id: string;
name: string;
Expand All @@ -31,6 +43,7 @@ export interface InstanceGroup {
enableScheduler: boolean;
enableUntrackedThrottle: boolean;
enableReconfiguration?: boolean;
metricsUrl: string;
gracePeriodTTLSec: number;
protectedTTLSec: number;
scalingOptions: ScalingOptions;
Expand All @@ -39,6 +52,7 @@ export interface InstanceGroup {
}

export interface InstanceGroupManagerOptions {
audit: Audit;
redisClient: Redis.Redis;
redisScanCount: number;
initialGroupList: Array<InstanceGroup>;
Expand All @@ -48,13 +62,15 @@ export interface InstanceGroupManagerOptions {

export default class InstanceGroupManager {
private readonly GROUPS_HASH_NAME = 'allgroups';
private readonly audit: Audit;
private redisClient: Redis.Redis;
private readonly redisScanCount: number;
private readonly initialGroupList: Array<InstanceGroup>;
private readonly processingIntervalSeconds: number;
private readonly sanityJobsIntervalSeconds: number;

constructor(options: InstanceGroupManagerOptions) {
this.audit = options.audit;
this.redisClient = options.redisClient;
this.redisScanCount = options.redisScanCount;
this.initialGroupList = options.initialGroupList;
Expand Down Expand Up @@ -83,6 +99,10 @@ export default class InstanceGroupManager {
return this.initialGroupList;
}

private getGroupMetricsKey(groupName: string): string {
return `gmetric:${groupName}`;
}

async existsAtLeastOneGroup(): Promise<boolean> {
let cursor = '0';
do {
Expand Down Expand Up @@ -274,6 +294,71 @@ export default class InstanceGroupManager {
return !(result !== null && result.length > 0);
}

async fetchGroupMetrics(ctx: Context, groupName: string): Promise<boolean> {
try {
const group = await this.getInstanceGroup(groupName);
if (!group) {
throw new Error(`Group ${groupName} not found, failed to report on group metrics`);
}

if (!group.metricsUrl) {
ctx.logger.debug(`Group ${groupName} no metrics url, skipping metrics fetching`);
return false;
}

const metrics: { [id: string]: number } = await got(group.metricsUrl).json();

const key: string = Object.keys(metrics).find((key) => {
return GroupTypeToGroupMetricKey[group.type] === key;
});

const metricsObject: GroupMetric = {
groupName,
timestamp: Date.now(),
value: metrics[key],
};

// store the group metrics
await this.redisClient.zadd(
this.getGroupMetricsKey(groupName),
metricsObject.timestamp,
JSON.stringify(metricsObject),
);

await this.audit.updateLastGroupMetricValue(ctx, groupName, metricsObject.value);
} catch (e) {
ctx.logger.error(`Failed to report group metrics for group ${groupName}`, e);
return false;
}
}

async getGroupMetricInventoryPerPeriod(
ctx: Context,
groupName: string,
periodsCount: number,
periodDurationSeconds: number,
): Promise<Array<Array<GroupMetric>>> {
const metricPoints: Array<Array<GroupMetric>> = [];
const metricsKey = this.getGroupMetricsKey(groupName);
const now = Date.now();
const items: string[] = await this.redisClient.zrange(metricsKey, 0, -1);

for (let periodIdx = 0; periodIdx < periodsCount; periodIdx++) {
metricPoints[periodIdx] = [];
}

items.forEach((item) => {
const itemJson: GroupMetric = JSON.parse(item);
const periodIdx = Math.floor((now - itemJson.timestamp) / (periodDurationSeconds * 1000));

if (periodIdx >= 0 && periodIdx < periodsCount) {
metricPoints[periodIdx].push(itemJson);
}
});

return metricPoints;
}

async setGroupJobsCreationGracePeriod(): Promise<boolean> {
return this.setValue(`groupJobsCreationGracePeriod`, this.processingIntervalSeconds);
}
Expand Down
37 changes: 33 additions & 4 deletions src/instance_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Context } from './context';
import Redis from 'ioredis';
import ShutdownManager from './shutdown_manager';
import Audit from './audit';
import { InstanceGroup } from './instance_group';
import { GroupMetric, InstanceGroup } from './instance_group';

/* eslint-disable */
function isEmpty(obj: any) {
Expand Down Expand Up @@ -82,6 +82,7 @@ export interface JigasiStatus {
// largest_conference: number;
graceful_shutdown: boolean;
}

export interface InstanceDetails {
instanceId: string;
instanceType: string;
Expand Down Expand Up @@ -352,21 +353,49 @@ export class InstanceTracker {
async getSummaryMetricPerPeriod(
ctx: Context,
group: InstanceGroup,
metricInventoryPerPeriod: Array<Array<InstanceMetric>>,
metricInventoryPerPeriod: Array<Array<InstanceMetric | GroupMetric>>,
periodCount: number,
): Promise<Array<number>> {
switch (group.type) {
case 'jibri':
case 'sip-jibri':
return this.getAvailableMetricPerPeriod(ctx, metricInventoryPerPeriod, periodCount);
return this.getAvailableMetricPerPeriod(
ctx,
metricInventoryPerPeriod as Array<Array<InstanceMetric>>,
periodCount,
);
case 'nomad':
case 'jigasi':
case 'JVB':
return this.getAverageMetricPerPeriod(ctx, metricInventoryPerPeriod, periodCount);
return this.getAverageMetricPerPeriod(
ctx,
metricInventoryPerPeriod as Array<Array<InstanceMetric>>,
periodCount,
);
case 'skynet':
return this.getSkynetGroupMetricPerPeriod(
ctx,
metricInventoryPerPeriod as Array<Array<GroupMetric>>,
periodCount,
);
}
return;
}

async getSkynetGroupMetricPerPeriod(
ctx: Context,
metricInventoryPerPeriod: Array<Array<GroupMetric>>,
periodCount: number,
): Promise<Array<number>> {
ctx.logger.debug(`Getting skynet group metric per period for ${periodCount} periods`, {
metricInventoryPerPeriod,
});

return metricInventoryPerPeriod
.slice(0, periodCount)
.map((groupMetrics: Array<GroupMetric>) => groupMetrics[0]?.value ?? 0);
}

async getAvailableMetricPerPeriod(
ctx: Context,
metricInventoryPerPeriod: Array<Array<InstanceMetric>>,
Expand Down
Loading

0 comments on commit b7858a9

Please sign in to comment.