Skip to content

Commit

Permalink
[ML] AIOps: Improve flushFix for Log Rate Analysis (elastic#165069)
Browse files Browse the repository at this point in the history
Improves the `flushFix` behaviour for Log Rate Analysis. Previously the
setting would add a 4KB size additional dummy payload to each object
returned as ndjson. For the dataset used for testing this, this would
result in an overall response payload of ˜900Kbytes. For comparison,
without `flushFix` the response size would be ˜40Kbytes in this case.
This PR changes the behaviour to only send a dummy payload every 500ms
if the real data sent in the last 500ms wasn't bigger than 4Kbytes.
Depending on the speed of the response, this can bring down the overall
response payload to ˜300Kbytes (Cloud uncached), ˜150Kbytes (Cloud
cached) or even ˜70Kbytes (local cluster) for the same dataset.
  • Loading branch information
walterra authored Oct 25, 2023
1 parent 8938a57 commit 5c578a0
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import { schema, TypeOf } from '@kbn/config-schema';

export const reducerStreamRequestBodySchema = schema.object({
/** Boolean flag to enable/disabling simulation of response errors. */
/** Boolean flag to enable/disable simulation of response errors. */
simulateErrors: schema.maybe(schema.boolean()),
/** Maximum timeout between streaming messages. */
timeout: schema.maybe(schema.number()),
/** Setting to override headers derived compression */
compressResponse: schema.maybe(schema.boolean()),
/** Boolean flag to enable/disable 4KB payload flush fix. */
flushFix: schema.maybe(schema.boolean()),
});
export type ReducerStreamRequestBodySchema = TypeOf<typeof reducerStreamRequestBodySchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ export const PageReducerStream: FC = () => {

const [simulateErrors, setSimulateErrors] = useState(false);
const [compressResponse, setCompressResponse] = useState(true);
const [flushFix, setFlushFix] = useState(false);

const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream(
http,
RESPONSE_STREAM_API_ENDPOINT.REDUCER_STREAM,
'1',
{ compressResponse, simulateErrors },
{ compressResponse, flushFix, simulateErrors },
{ reducer: reducerStreamReducer, initialState }
);

Expand Down Expand Up @@ -149,6 +150,13 @@ export const PageReducerStream: FC = () => {
onChange={(e) => setCompressResponse(!compressResponse)}
compressed
/>
<EuiCheckbox
id="responseStreamFlushFixCheckbox"
label="Toggle flushFix setting for response stream."
checked={flushFix}
onChange={(e) => setFlushFix(!flushFix)}
compressed
/>
</EuiText>
</Page>
);
Expand Down
3 changes: 2 additions & 1 deletion examples/response_stream/server/routes/reducer_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger,
request.body.compressResponse
request.body.compressResponse,
request.body.flushFix
);

const entities = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ describe('streamFactory', () => {
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
'X-Content-Type-Options': 'nosniff',
});
expect(streamResult).toBe('push1push2');
});
Expand All @@ -75,6 +76,7 @@ describe('streamFactory', () => {
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
'X-Content-Type-Options': 'nosniff',
});
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
Expand Down Expand Up @@ -121,6 +123,7 @@ describe('streamFactory', () => {
'content-encoding': 'gzip',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
'X-Content-Type-Options': 'nosniff',
});
expect(streamResult).toBe('push1push2');

Expand Down Expand Up @@ -165,6 +168,7 @@ describe('streamFactory', () => {
'content-encoding': 'gzip',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
'X-Content-Type-Options': 'nosniff',
});
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
Expand Down
34 changes: 27 additions & 7 deletions x-pack/packages/ml/response_stream/server/stream_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ function isCompressedSream(arg: unknown): arg is zlib.Gzip {
return typeof arg === 'object' && arg !== null && typeof (arg as zlib.Gzip).flush === 'function';
}

const FLUSH_KEEP_ALIVE_INTERVAL_MS = 500;
const FLUSH_PAYLOAD_SIZE = 4 * 1024;

class UncompressedResponseStream extends Stream.PassThrough {}
Expand Down Expand Up @@ -76,6 +77,7 @@ export function streamFactory<T = unknown>(
const flushPayload = flushFix
? crypto.randomBytes(FLUSH_PAYLOAD_SIZE).toString('hex')
: undefined;
let responseSizeSinceLastKeepAlive = 0;

const stream = isCompressed ? zlib.createGzip() : new UncompressedResponseStream();

Expand Down Expand Up @@ -132,6 +134,25 @@ export function streamFactory<T = unknown>(
// otherwise check the integrity of the data to be pushed.
if (streamType === undefined) {
streamType = typeof d === 'string' ? 'string' : 'ndjson';

// This is a fix for ndjson streaming with proxy configurations
// that buffer responses up to 4KB in size. We keep track of the
// size of the response sent so far and if it's still smaller than
// FLUSH_PAYLOAD_SIZE then we'll push an additional keep-alive object
// that contains the flush fix payload.
if (flushFix && streamType === 'ndjson') {
function repeat() {
if (!tryToEnd) {
if (responseSizeSinceLastKeepAlive < FLUSH_PAYLOAD_SIZE) {
push({ flushPayload } as unknown as T);
}
responseSizeSinceLastKeepAlive = 0;
setTimeout(repeat, FLUSH_KEEP_ALIVE_INTERVAL_MS);
}
}

repeat();
}
} else if (streamType === 'string' && typeof d !== 'string') {
logger.error('Must not push non-string chunks to a string based stream.');
return;
Expand All @@ -148,13 +169,11 @@ export function streamFactory<T = unknown>(

try {
const line =
streamType === 'ndjson'
? `${JSON.stringify({
...d,
// This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size.
...(flushFix ? { flushPayload } : {}),
})}${DELIMITER}`
: d;
streamType === 'ndjson' ? `${JSON.stringify(d)}${DELIMITER}` : (d as unknown as string);

if (streamType === 'ndjson') {
responseSizeSinceLastKeepAlive += new Blob([line]).size;
}

waitForCallbacks.push(1);
const writeOk = stream.write(line, () => {
Expand Down Expand Up @@ -211,6 +230,7 @@ export function streamFactory<T = unknown>(
// This disables response buffering on proxy servers (Nginx, uwsgi, fastcgi, etc.)
// Otherwise, those proxies buffer responses up to 4/8 KiB.
'X-Accel-Buffering': 'no',
'X-Content-Type-Options': 'nosniff',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export const LogRateAnalysisResults: FC<LogRateAnalysisResultsProps> = ({
data,
isRunning,
errors: streamErrors,
} = useFetchStream(
} = useFetchStream<AiopsApiLogRateAnalysis['body'], typeof streamReducer>(
http,
'/internal/aiops/log_rate_analysis',
'1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,15 @@ export default ({ getService }: FtrProviderContext) => {
data.push(action);
}

// If streaming works correctly we should receive more than one chunk.
expect(chunkCounter).to.be.greaterThan(1);
// Originally we assumed that we can assert streaming in contrast
// to non-streaming if there is more than one chunk. However,
// this turned out to be flaky since a stream could finish fast
// enough to contain only one chunk. So now we are checking if
// there's just one chunk or more.
expect(chunkCounter).to.be.greaterThan(
0,
`Expected 'chunkCounter' to be greater than 0, got ${chunkCounter}.`
);

await assertAnalysisResult(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,14 @@ export default ({ getService }: FtrProviderContext) => {
data.push(action);
}

// If streaming works correctly we should receive more than one chunk.
// Originally we assumed that we can assert streaming in contrast
// to non-streaming if there is more than one chunk. However,
// this turned out to be flaky since a stream could finish fast
// enough to contain only one chunk. So now we are checking if
// there's just one chunk or more.
expect(chunkCounter).to.be.greaterThan(
1,
`Expected 'chunkCounter' to be greater than 1, got ${chunkCounter} with the following data: ${JSON.stringify(
data
)}.`
0,
`Expected 'chunkCounter' to be greater than 0, got ${chunkCounter}.`
);

await assertAnalysisResult(data);
Expand Down

0 comments on commit 5c578a0

Please sign in to comment.