Skip to content

Commit

Permalink
Merge pull request #32 from Yolean/last-seen-offsets-at-value-retrieval
Browse files Browse the repository at this point in the history
Update last seen offset metric when getting values and perform leadin…
  • Loading branch information
atamon authored Jun 13, 2023
2 parents 198aacf + 8c1e75c commit 3817224
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 20 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@yolean/kafka-keyvalue",
"version": "1.5.0",
"version": "1.6.1",
"keywords": [],
"author": "Yolean AB",
"license": "Apache-2.0",
Expand Down
76 changes: 69 additions & 7 deletions src/KafkaKeyValue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const promClientMock = {
this.labels = jest.fn().mockReturnValue(this);
this.reset = jest.fn();
this.setToCurrentTime = jest.fn();
this.startTimer = jest.fn();
this.startTimer = jest.fn().mockReturnValue(() => jest.fn());
this.remove = jest.fn();
}
},
Expand All @@ -54,7 +54,7 @@ const promClientMock = {
constructor(options) {

this.observe = jest.fn();
this.startTimer = jest.fn();
this.startTimer = jest.fn().mockReturnValue(() => jest.fn());
this.labels = jest.fn().mockReturnValue(this);
this.reset = jest.fn();
this.remove = jest.fn();
Expand Down Expand Up @@ -93,7 +93,6 @@ describe('KafkaKeyValue', function () {
pixyHost: 'http://pixy',
topicName: 'testtopic01',
fetchImpl: fetchMock,
updateDebounceTimeoutMs: 1
});

const offset = await kkv.put('key1', 'value1');
Expand All @@ -118,7 +117,6 @@ describe('KafkaKeyValue', function () {
pixyHost: 'http://pixy',
topicName: 'testtopic01',
fetchImpl: fetchMock,
updateDebounceTimeoutMs: 1
});

try {
Expand Down Expand Up @@ -171,6 +169,42 @@ describe('KafkaKeyValue', function () {
expect(onValue).toBeCalledWith({ foo: 'bar' })
expect(onValue).toBeCalledWith({ foo: 'bar2' })
});

it('updates last seen offset metric based on header value', async function () {
const response = {
body: new EventEmitter(),
headers: new Map([
['x-kkv-last-seen-offsets', JSON.stringify([
{ topic: 'testtopic01', partition: 0, offset: 17 }
])]
])
};

const fetchMock = jest.fn().mockReturnValueOnce(response);

const metrics = KafkaKeyValue.createMetrics(promClientMock.Counter, promClientMock.Gauge, promClientMock.Histogram);
const kkv = new KafkaKeyValue({
cacheHost: 'http://cache-kkv',
metrics,
pixyHost: 'http://pixy',
topicName: 'testtopic01',
fetchImpl: fetchMock
});

const streaming = kkv.streamValues(() => {});
await Promise.resolve();
response.body.emit('end');

await streaming;

expect(metrics.kafka_key_value_last_seen_offset.set).toHaveBeenCalledWith(
{
topic: 'testtopic01',
partition: 0
},
17
)
});
});

describe('onupdate handlers', function () {
Expand All @@ -183,7 +217,6 @@ describe('KafkaKeyValue', function () {
metrics,
pixyHost: 'http://pixy',
topicName: 'testtopic01',
updateDebounceTimeoutMs: 1
});

const onUpdateSpy = jest.fn();
Expand Down Expand Up @@ -212,6 +245,22 @@ describe('KafkaKeyValue', function () {
expect(metrics.kafka_key_value_last_seen_offset.labels).toHaveBeenCalledTimes(1);
expect(metrics.kafka_key_value_last_seen_offset.labels).toHaveBeenCalledWith('cache-kkv', 'testtopic01', '0');
expect(metrics.kafka_key_value_last_seen_offset.set).toHaveBeenCalledWith(28262);

updateEvents.emit('update', {
v: 1,
topic: 'testtopic01',
offsets: {
'0': 28263
},
updates: {
'bd3f6188-d865-443d-8646-03e8f1c643cb': {}
}
});

// Promises needs to resolve before we get new value
await new Promise(resolve => setTimeout(resolve, 10));

expect(onUpdateSpy).toHaveBeenCalledTimes(2);
});

it('only handles updates for the same key once if called within the debounce timeout period', async function () {
Expand All @@ -221,7 +270,6 @@ describe('KafkaKeyValue', function () {
metrics,
pixyHost: 'http://pixy',
topicName: 'testtopic01',
updateDebounceTimeoutMs: 10
});

const onUpdateSpy = jest.fn();
Expand Down Expand Up @@ -298,6 +346,21 @@ describe('KafkaKeyValue', function () {
expect(onUpdateSpy).toHaveBeenCalledTimes(2);
expect(onUpdateSpy).toHaveBeenCalledWith('bd3f6188-d865-443d-8646-03e8f1c643cb', { foo: 'bar' })
expect(onUpdateSpy).toHaveBeenCalledWith('aaaa6188-d865-443d-8646-03e8f1c643cb', { foo: 'bar' })

updateEvents.emit('update', {
v: 1,
topic: 'testtopic01',
offsets: {
'0': 28265
},
updates: {
'aaaa6188-d865-443d-8646-03e8f1c643cb': {}
}
});

await Promise.resolve();

expect(onUpdateSpy).toHaveBeenCalledTimes(3);
});
});

Expand All @@ -309,7 +372,6 @@ describe('KafkaKeyValue', function () {
metrics,
pixyHost: 'http://pixy',
topicName: 'testtopic01',
updateDebounceTimeoutMs: 1
});

kkv.updatePartitionOffsetMetrics({
Expand Down
40 changes: 30 additions & 10 deletions src/KafkaKeyValue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const pGunzip = promisify<InputType, Buffer>(gunzip);
const pGzip = promisify<InputType, Buffer>(gzip);

const KKV_CACHE_HOST_READINESS_ENDPOINT = process.env.KKV_CACHE_HOST_READINESS_ENDPOINT || '/q/health/ready';
const DEFAULT_UPDATE_DEBOUNCE_TIMEOUT_MS: number = 2000;
const LAST_SEEN_OFFSETS_HEADER_NAME = 'x-kkv-last-seen-offsets';

export interface IKafkaKeyValueImpl { new (options: IKafkaKeyValue): KafkaKeyValue }

Expand All @@ -20,7 +20,6 @@ export interface IKafkaKeyValue {
gzip?: boolean
metrics: IKafkaKeyValueMetrics
fetchImpl?: IFetchImpl
updateDebounceTimeoutMs?: number
}

export interface CounterConstructor {
Expand Down Expand Up @@ -192,7 +191,7 @@ export default class KafkaKeyValue {
private readonly metrics: IKafkaKeyValueMetrics;
private readonly fetchImpl: IFetchImpl;
private readonly logger;
private readonly pendingKeyUpdates: Set<string> = new Set();
private readonly lastKeyUpdate: Map<string, number> = new Map();
private readonly partitionOffsets: Map<string, number> = new Map();

constructor(config: IKafkaKeyValue) {
Expand All @@ -202,7 +201,7 @@ export default class KafkaKeyValue {
this.fetchImpl = getFetchImpl(config);
this.logger = getLogger({ name: `kkv:${this.getCacheName()}` });

updateEvents.on('update', async (requestBody) => {
updateEvents.on('update', async (requestBody: { v: number, offsets: { [partition: string]: number }, topic: string, updates: { [key: string]: {} } }) => {
if (requestBody.v !== 1) throw new Error(`Unknown kkv onupdate protocol ${requestBody.v}!`);

const {
Expand All @@ -216,13 +215,20 @@ export default class KafkaKeyValue {
return;
}

const highestOffset: number = Object.values(offsets).reduce((memo, offset) => {
return Math.max(memo, offset);
}, -1);

if (this.updateHandlers.length > 0) {
Object.keys(updates).forEach(key => this.pendingKeyUpdates.add(key));
await new Promise(resolve => setTimeout(resolve, config.updateDebounceTimeoutMs || DEFAULT_UPDATE_DEBOUNCE_TIMEOUT_MS));

const updatesToPropagate = Array.from(this.pendingKeyUpdates).map(key => {
this.pendingKeyUpdates.delete(key);
return key;
const updatesToPropagate: string[] = [];

Object.keys(updates).forEach(key => {
const pendingOffset = this.lastKeyUpdate.get(key);
if (pendingOffset === undefined || highestOffset > pendingOffset) {
updatesToPropagate.push(key);
this.lastKeyUpdate.set(key, highestOffset);
}
});

const updatesBeingPropagated = updatesToPropagate.map(async key => {
Expand Down Expand Up @@ -251,7 +257,7 @@ export default class KafkaKeyValue {
updatePartitionOffsetMetrics(offsets: { [partition: string]: number }) {
Object.entries(offsets).forEach(([partition, offset]) => {
const existingOffset = this.partitionOffsets.get(partition);
if (!existingOffset || existingOffset < offset) {
if (existingOffset === undefined || existingOffset < offset) {
this.partitionOffsets.set(partition, offset);
this.metrics.kafka_key_value_last_seen_offset
.labels(this.getCacheName(), this.topic, partition)
Expand Down Expand Up @@ -320,6 +326,9 @@ export default class KafkaKeyValue {

parseTiming();
this.logger.debug({ key, value }, 'KafkaCache get value returned')

this.updateLastSeenOffsetsFromHeader(res);

return value;
}

Expand All @@ -336,6 +345,8 @@ export default class KafkaKeyValue {

streamTiming();
this.logger.debug({ cache_name: this.getCacheName() }, 'Streaming values for cache finished');

this.updateLastSeenOffsetsFromHeader(res);
}

async put(key: string, value: any, options: IRetryOptions = PUT_RETRY_DEFAULTS): Promise<number> {
Expand All @@ -353,4 +364,13 @@ export default class KafkaKeyValue {
onUpdate(fn: UpdateHandler) {
this.updateHandlers.push(fn);
}

private updateLastSeenOffsetsFromHeader(res: Pick<Response, "headers">) {
const lastSeenOffsetsHeader = res.headers.get(LAST_SEEN_OFFSETS_HEADER_NAME);
if (!lastSeenOffsetsHeader) throw new Error(`Missing header "${LAST_SEEN_OFFSETS_HEADER_NAME}"`);
const lastSeenOffsets = JSON.parse(lastSeenOffsetsHeader);
lastSeenOffsets.forEach(({ topic, partition, offset }) => {
this.metrics.kafka_key_value_last_seen_offset.set({ topic, partition }, offset);
});
}
}

0 comments on commit 3817224

Please sign in to comment.