Skip to content

Commit

Permalink
[Logs UI] Prevent summary request from piling up (#148670)
Browse files Browse the repository at this point in the history
Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
weltenwort and kibanamachine authored Jan 12, 2023
1 parent ae55948 commit f01a61b
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import { datemathToEpochMillis } from '../../../utils/datemath';
jest.mock('./api/fetch_log_summary', () => ({ fetchLogSummary: jest.fn() }));
const fetchLogSummaryMock = fetchLogSummary as jest.MockedFunction<typeof fetchLogSummary>;

jest.mock('../../../hooks/use_kibana', () => ({
useKibanaContextForPlugin: () => ({ services: mockCoreMock.createStart() }),
}));
jest.mock('../../../hooks/use_kibana', () => {
const services = mockCoreMock.createStart();
return {
useKibanaContextForPlugin: () => ({ services }),
};
});

describe('useLogSummary hook', () => {
beforeEach(() => {
Expand Down Expand Up @@ -161,6 +164,34 @@ describe('useLogSummary hook', () => {
expect.anything()
);
});

it("doesn't query for new summary buckets when the previous request is still in flight", async () => {
fetchLogSummaryMock.mockResolvedValueOnce(createMockResponse([]));

const firstRange = createMockDateRange();
const { waitForNextUpdate, rerender } = renderHook(
({ startTimestamp, endTimestamp }) =>
useLogSummary('SOURCE_ID', startTimestamp, endTimestamp, null),
{
initialProps: firstRange,
}
);

const secondRange = createMockDateRange('now-20s', 'now');

// intentionally don't wait for an update to test the throttling
rerender(secondRange);
await waitForNextUpdate();

expect(fetchLogSummaryMock).toHaveBeenCalledTimes(1);
expect(fetchLogSummaryMock).toHaveBeenLastCalledWith(
expect.objectContaining({
startTimestamp: firstRange.startTimestamp,
endTimestamp: firstRange.endTimestamp,
}),
expect.anything()
);
});
});

const createMockResponse = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
* 2.0.
*/

import { useState } from 'react';

import { useCancellableEffect } from '../../../utils/cancellable_effect';
import { useEffect } from 'react';
import { exhaustMap, map, Observable } from 'rxjs';
import { HttpHandler } from '@kbn/core-http-browser';
import { useObservableState, useReplaySubject } from '../../../utils/use_observable';
import { fetchLogSummary } from './api/fetch_log_summary';
import { LogEntriesSummaryResponse } from '../../../../common/http_api';
import { LogEntriesSummaryRequest, LogEntriesSummaryResponse } from '../../../../common/http_api';
import { useBucketSize } from './bucket_size';
import { useKibanaContextForPlugin } from '../../../hooks/use_kibana';

Expand All @@ -22,36 +23,51 @@ export const useLogSummary = (
filterQuery: string | null
) => {
const { services } = useKibanaContextForPlugin();
const [logSummaryBuckets, setLogSummaryBuckets] = useState<LogSummaryBuckets>([]);
const bucketSize = useBucketSize(startTimestamp, endTimestamp);

useCancellableEffect(
(getIsCancelled) => {
if (startTimestamp === null || endTimestamp === null || bucketSize === null) {
return;
}

fetchLogSummary(
{
sourceId,
startTimestamp,
endTimestamp,
bucketSize,
query: filterQuery,
},
services.http.fetch
).then((response) => {
if (!getIsCancelled()) {
setLogSummaryBuckets(response.data.buckets);
}
});
},
[sourceId, filterQuery, startTimestamp, endTimestamp, bucketSize]
);
const [logSummaryBuckets$, pushLogSummaryBucketsArgs] = useReplaySubject(fetchLogSummary$);
const { latestValue: logSummaryBuckets } = useObservableState(logSummaryBuckets$, NO_BUCKETS);

useEffect(() => {
if (startTimestamp === null || endTimestamp === null || bucketSize === null) {
return;
}

pushLogSummaryBucketsArgs([
{
sourceId,
startTimestamp,
endTimestamp,
bucketSize,
query: filterQuery,
},
services.http.fetch,
]);
}, [
bucketSize,
endTimestamp,
filterQuery,
pushLogSummaryBucketsArgs,
services.http.fetch,
sourceId,
startTimestamp,
]);

return {
buckets: logSummaryBuckets,
start: startTimestamp,
end: endTimestamp,
};
};

const NO_BUCKETS: LogSummaryBuckets = [];

type FetchLogSummaryArgs = [args: LogEntriesSummaryRequest, fetch: HttpHandler];

const fetchLogSummary$ = (
fetchArguments$: Observable<FetchLogSummaryArgs>
): Observable<LogSummaryBuckets> =>
fetchArguments$.pipe(
exhaustMap(([args, fetch]) => fetchLogSummary(args, fetch)),
map(({ data: { buckets } }) => buckets)
);
33 changes: 30 additions & 3 deletions x-pack/plugins/infra/public/utils/use_observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
*/

import { useEffect, useRef, useState } from 'react';
import { BehaviorSubject, Observable, OperatorFunction, PartialObserver, Subscription } from 'rxjs';
import {
BehaviorSubject,
Observable,
OperatorFunction,
PartialObserver,
ReplaySubject,
Subscription,
} from 'rxjs';
import { switchMap } from 'rxjs/operators';

export const useLatest = <Value>(value: Value) => {
Expand Down Expand Up @@ -42,9 +49,29 @@ export const useBehaviorSubject = <
deriveObservableOnce: (input$: Observable<InputValue>) => OutputObservable,
createInitialValue: () => InputValue
) => {
const [subject$] = useState(() => new BehaviorSubject<InputValue>(createInitialValue()));
const [[subject$, next], _] = useState(() => {
const newSubject$ = new BehaviorSubject<InputValue>(createInitialValue());
const newNext = newSubject$.next.bind(newSubject$);
return [newSubject$, newNext] as const;
});
const [output$] = useState(() => deriveObservableOnce(subject$));
return [output$, next] as const;
};

export const useReplaySubject = <
InputValue,
OutputValue,
OutputObservable extends Observable<OutputValue>
>(
deriveObservableOnce: (input$: Observable<InputValue>) => OutputObservable
) => {
const [[subject$, next], _] = useState(() => {
const newSubject$ = new ReplaySubject<InputValue>();
const newNext = newSubject$.next.bind(newSubject$);
return [newSubject$, newNext] as const;
});
const [output$] = useState(() => deriveObservableOnce(subject$));
return [output$, subject$.next.bind(subject$)] as const;
return [output$, next] as const;
};

export const useObservableState = <State, InitialState>(
Expand Down

0 comments on commit f01a61b

Please sign in to comment.