diff --git a/examples/response_stream/common/api/reducer_stream/request_body_schema.ts b/examples/response_stream/common/api/reducer_stream/request_body_schema.ts index 8318a411ab86a..ebd08e55cb863 100644 --- a/examples/response_stream/common/api/reducer_stream/request_body_schema.ts +++ b/examples/response_stream/common/api/reducer_stream/request_body_schema.ts @@ -9,11 +9,13 @@ import { schema, TypeOf } from '@kbn/config-schema'; export const reducerStreamRequestBodySchema = schema.object({ - /** Boolean flag to enable/disabling simulation of response errors. */ + /** Boolean flag to enable/disable simulation of response errors. */ simulateErrors: schema.maybe(schema.boolean()), /** Maximum timeout between streaming messages. */ timeout: schema.maybe(schema.number()), /** Setting to override headers derived compression */ compressResponse: schema.maybe(schema.boolean()), + /** Boolean flag to enable/disable 4KB payload flush fix. */ + flushFix: schema.maybe(schema.boolean()), }); export type ReducerStreamRequestBodySchema = TypeOf; diff --git a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx index 466b6ddec75a0..a55f25292cf5d 100644 --- a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx @@ -43,12 +43,13 @@ export const PageReducerStream: FC = () => { const [simulateErrors, setSimulateErrors] = useState(false); const [compressResponse, setCompressResponse] = useState(true); + const [flushFix, setFlushFix] = useState(false); const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream( http, RESPONSE_STREAM_API_ENDPOINT.REDUCER_STREAM, '1', - { compressResponse, simulateErrors }, + { compressResponse, flushFix, simulateErrors }, { reducer: reducerStreamReducer, initialState } ); @@ -149,6 +150,13 @@ export const PageReducerStream: FC = () => { onChange={(e) => setCompressResponse(!compressResponse)} compressed /> + setFlushFix(!flushFix)} + compressed + /> ); diff --git a/examples/response_stream/server/routes/reducer_stream.ts b/examples/response_stream/server/routes/reducer_stream.ts index 81ba44205d31b..5e03cd0732e74 100644 --- a/examples/response_stream/server/routes/reducer_stream.ts +++ b/examples/response_stream/server/routes/reducer_stream.ts @@ -60,7 +60,8 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { const { end, push, responseWithHeaders } = streamFactory( request.headers, logger, - request.body.compressResponse + request.body.compressResponse, + request.body.flushFix ); const entities = [ diff --git a/x-pack/packages/ml/response_stream/server/stream_factory.test.ts b/x-pack/packages/ml/response_stream/server/stream_factory.test.ts index 27751b7dc3fd1..4b75cf4e0826a 100644 --- a/x-pack/packages/ml/response_stream/server/stream_factory.test.ts +++ b/x-pack/packages/ml/response_stream/server/stream_factory.test.ts @@ -49,6 +49,7 @@ describe('streamFactory', () => { Connection: 'keep-alive', 'Transfer-Encoding': 'chunked', 'X-Accel-Buffering': 'no', + 'X-Content-Type-Options': 'nosniff', }); expect(streamResult).toBe('push1push2'); }); @@ -75,6 +76,7 @@ describe('streamFactory', () => { Connection: 'keep-alive', 'Transfer-Encoding': 'chunked', 'X-Accel-Buffering': 'no', + 'X-Content-Type-Options': 'nosniff', }); expect(parsedItems).toHaveLength(2); expect(parsedItems[0]).toStrictEqual(mockItem1); @@ -121,6 +123,7 @@ describe('streamFactory', () => { 'content-encoding': 'gzip', 'Transfer-Encoding': 'chunked', 'X-Accel-Buffering': 'no', + 'X-Content-Type-Options': 'nosniff', }); expect(streamResult).toBe('push1push2'); @@ -165,6 +168,7 @@ describe('streamFactory', () => { 'content-encoding': 'gzip', 'Transfer-Encoding': 'chunked', 'X-Accel-Buffering': 'no', + 'X-Content-Type-Options': 'nosniff', }); expect(parsedItems).toHaveLength(2); expect(parsedItems[0]).toStrictEqual(mockItem1); diff --git a/x-pack/packages/ml/response_stream/server/stream_factory.ts b/x-pack/packages/ml/response_stream/server/stream_factory.ts index ab676e0104b78..8836c241e55d8 100644 --- a/x-pack/packages/ml/response_stream/server/stream_factory.ts +++ b/x-pack/packages/ml/response_stream/server/stream_factory.ts @@ -19,6 +19,7 @@ function isCompressedSream(arg: unknown): arg is zlib.Gzip { return typeof arg === 'object' && arg !== null && typeof (arg as zlib.Gzip).flush === 'function'; } +const FLUSH_KEEP_ALIVE_INTERVAL_MS = 500; const FLUSH_PAYLOAD_SIZE = 4 * 1024; class UncompressedResponseStream extends Stream.PassThrough {} @@ -76,6 +77,7 @@ export function streamFactory( const flushPayload = flushFix ? crypto.randomBytes(FLUSH_PAYLOAD_SIZE).toString('hex') : undefined; + let responseSizeSinceLastKeepAlive = 0; const stream = isCompressed ? zlib.createGzip() : new UncompressedResponseStream(); @@ -132,6 +134,25 @@ export function streamFactory( // otherwise check the integrity of the data to be pushed. if (streamType === undefined) { streamType = typeof d === 'string' ? 'string' : 'ndjson'; + + // This is a fix for ndjson streaming with proxy configurations + // that buffer responses up to 4KB in size. We keep track of the + // size of the response sent so far and if it's still smaller than + // FLUSH_PAYLOAD_SIZE then we'll push an additional keep-alive object + // that contains the flush fix payload. + if (flushFix && streamType === 'ndjson') { + function repeat() { + if (!tryToEnd) { + if (responseSizeSinceLastKeepAlive < FLUSH_PAYLOAD_SIZE) { + push({ flushPayload } as unknown as T); + } + responseSizeSinceLastKeepAlive = 0; + setTimeout(repeat, FLUSH_KEEP_ALIVE_INTERVAL_MS); + } + } + + repeat(); + } } else if (streamType === 'string' && typeof d !== 'string') { logger.error('Must not push non-string chunks to a string based stream.'); return; @@ -148,13 +169,11 @@ export function streamFactory( try { const line = - streamType === 'ndjson' - ? `${JSON.stringify({ - ...d, - // This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size. - ...(flushFix ? { flushPayload } : {}), - })}${DELIMITER}` - : d; + streamType === 'ndjson' ? `${JSON.stringify(d)}${DELIMITER}` : (d as unknown as string); + + if (streamType === 'ndjson') { + responseSizeSinceLastKeepAlive += new Blob([line]).size; + } waitForCallbacks.push(1); const writeOk = stream.write(line, () => { @@ -211,6 +230,7 @@ export function streamFactory( // This disables response buffering on proxy servers (Nginx, uwsgi, fastcgi, etc.) // Otherwise, those proxies buffer responses up to 4/8 KiB. 'X-Accel-Buffering': 'no', + 'X-Content-Type-Options': 'nosniff', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'Transfer-Encoding': 'chunked', diff --git a/x-pack/plugins/aiops/public/components/log_rate_analysis/log_rate_analysis_results.tsx b/x-pack/plugins/aiops/public/components/log_rate_analysis/log_rate_analysis_results.tsx index 40ee98f3234dc..97d7201f0140d 100644 --- a/x-pack/plugins/aiops/public/components/log_rate_analysis/log_rate_analysis_results.tsx +++ b/x-pack/plugins/aiops/public/components/log_rate_analysis/log_rate_analysis_results.tsx @@ -176,7 +176,7 @@ export const LogRateAnalysisResults: FC = ({ data, isRunning, errors: streamErrors, - } = useFetchStream( + } = useFetchStream( http, '/internal/aiops/log_rate_analysis', '1', diff --git a/x-pack/test/api_integration/apis/aiops/log_rate_analysis_full_analysis.ts b/x-pack/test/api_integration/apis/aiops/log_rate_analysis_full_analysis.ts index 5ac7474324c50..c9fe22a472f4f 100644 --- a/x-pack/test/api_integration/apis/aiops/log_rate_analysis_full_analysis.ts +++ b/x-pack/test/api_integration/apis/aiops/log_rate_analysis_full_analysis.ts @@ -191,8 +191,15 @@ export default ({ getService }: FtrProviderContext) => { data.push(action); } - // If streaming works correctly we should receive more than one chunk. - expect(chunkCounter).to.be.greaterThan(1); + // Originally we assumed that we can assert streaming in contrast + // to non-streaming if there is more than one chunk. However, + // this turned out to be flaky since a stream could finish fast + // enough to contain only one chunk. So now we are checking if + // there's just one chunk or more. + expect(chunkCounter).to.be.greaterThan( + 0, + `Expected 'chunkCounter' to be greater than 0, got ${chunkCounter}.` + ); await assertAnalysisResult(data); } diff --git a/x-pack/test/api_integration/apis/aiops/log_rate_analysis_groups_only.ts b/x-pack/test/api_integration/apis/aiops/log_rate_analysis_groups_only.ts index cfd812e4f435c..8aeccc6af9a97 100644 --- a/x-pack/test/api_integration/apis/aiops/log_rate_analysis_groups_only.ts +++ b/x-pack/test/api_integration/apis/aiops/log_rate_analysis_groups_only.ts @@ -194,12 +194,14 @@ export default ({ getService }: FtrProviderContext) => { data.push(action); } - // If streaming works correctly we should receive more than one chunk. + // Originally we assumed that we can assert streaming in contrast + // to non-streaming if there is more than one chunk. However, + // this turned out to be flaky since a stream could finish fast + // enough to contain only one chunk. So now we are checking if + // there's just one chunk or more. expect(chunkCounter).to.be.greaterThan( - 1, - `Expected 'chunkCounter' to be greater than 1, got ${chunkCounter} with the following data: ${JSON.stringify( - data - )}.` + 0, + `Expected 'chunkCounter' to be greater than 0, got ${chunkCounter}.` ); await assertAnalysisResult(data);