diff --git a/Makefile b/Makefile index e22ba989..199804ac 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ include k8s.mk setup: echo "Using extra pip index: $PIP_EXTRA_INDEX_URL" - pip install --no-use-pep517 -r requirements/test.txt + pip install -r requirements/test.txt lint: black --check platform_monitoring tests setup.py diff --git a/platform_monitoring/logs.py b/platform_monitoring/logs.py index 5a53e492..384f1e06 100644 --- a/platform_monitoring/logs.py +++ b/platform_monitoring/logs.py @@ -1,7 +1,7 @@ import io import logging import warnings -from typing import Any, AsyncContextManager, Dict, Optional +from typing import Any, AsyncContextManager, Dict, Optional, Tuple import aiohttp from aioelasticsearch import Elasticsearch @@ -39,6 +39,7 @@ class FilteredStreamWrapper: def __init__(self, stream: aiohttp.StreamReader) -> None: self._stream = stream self._buffer = LogBuffer() + self._is_line_start = True def close(self) -> None: self._buffer.close() @@ -48,30 +49,73 @@ async def read(self, size: int = -1) -> bytes: if chunk: return chunk - chunk = await self._readline() + chunk = await self._read() self._buffer.write(chunk) return self._buffer.read(size) - async def _readline(self) -> bytes: - line = await self._stream.readline() + async def _read(self) -> bytes: # https://github.com/neuromation/platform-api/issues/131 # k8s API (and the underlying docker API) sometimes returns an rpc # error as the last log line. it says that the corresponding container # does not exist. we should try to not expose such internals, but only # if it is the last line indeed. - if line.startswith(b"rpc error: code ="): - next_line = await self._stream.readline() - if next_line: + error_prefix = b"rpc error: code =" + chunk, is_line_start = await self._read_chunk(min_line_length=len(error_prefix)) + # 1. `chunk` may not be a whole line, ending with "\n"; + # 2. `chunk` may be the beginning of a line with the min length of + # `len(error_prefix)`. + if is_line_start and chunk.startswith(error_prefix): + self._unreadline(chunk) + line = await self._readline() + next_chunk, _ = await self._read_chunk(min_line_length=1) + if next_chunk: logging.warning("An rpc error line was not at the end of the log") - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - self._stream.unread_data(next_line) + chunk = line + self._unreadline(next_chunk) else: logging.info("Skipping an rpc error line at the end of the log") - line = next_line + chunk = next_chunk # b"" + return chunk + + async def _read_chunk(self, *, min_line_length: int) -> Tuple[bytes, bool]: + chunk = io.BytesIO() + is_line_start = self._is_line_start + + while chunk.tell() < min_line_length: + data = await self._stream.readany() + if not data: + break + + n_pos = data.find(b"\n") + 1 + self._is_line_start = bool(n_pos) + if n_pos: + line, tail = data[:n_pos], data[n_pos:] + if tail: + self._unreadline(tail) + chunk.write(line) + break + + chunk.write(data) + if not is_line_start: + # if this chunk is somewhere in the middle of the line, we + # want to return immediately without waiting for the rest of + # `min_chunk_length` + break + + return chunk.getvalue(), is_line_start + + async def _readline(self) -> bytes: + line = await self._stream.readline() + self._is_line_start = True return line + def _unreadline(self, data: bytes) -> None: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=DeprecationWarning) + self._stream.unread_data(data) + self._is_line_start = True + class PodContainerLogReader(LogReader): def __init__( diff --git a/tests/unit/test_kube.py b/tests/unit/test_kube.py index cc4e1641..7ad9db8d 100644 --- a/tests/unit/test_kube.py +++ b/tests/unit/test_kube.py @@ -1,4 +1,5 @@ -from typing import Any, Dict +import asyncio +from typing import Any, Dict, List from unittest import mock import aiohttp @@ -221,3 +222,60 @@ async def test_filtered_two_rpc_errors(self) -> None: assert chunk == b"rpc error: code = whatever\n" chunk = await stream.read() assert not chunk + + @pytest.mark.asyncio + async def test_not_filtered_single_rpc_not_eof(self) -> None: + reader = aiohttp.StreamReader(mock.Mock(_reading_paused=False)) + reader.feed_data(b"line1\n") + reader.feed_data(b"rpc error: code = whatever\n") + reader.feed_data(b"line2\n") + reader.feed_eof() + stream = FilteredStreamWrapper(reader) + chunk = await stream.read() + assert chunk == b"line1\n" + chunk = await stream.read() + assert chunk == b"rpc error: code = whatever\n" + chunk = await stream.read() + assert chunk == b"line2\n" + chunk = await stream.read() + assert not chunk + + @pytest.mark.asyncio + async def test_min_line_chunk(self) -> None: + reader = aiohttp.StreamReader(mock.Mock(_reading_paused=False)) + stream = FilteredStreamWrapper(reader) + + async def _read_all() -> List[bytes]: + chunks: List[bytes] = [] + while True: + c = await stream.read() + chunks.append(c) + if not c: + break + return chunks + + async def _feed_raw_chunk(data: bytes) -> None: + reader.feed_data(data) + await asyncio.sleep(0.0) + + task = asyncio.create_task(_read_all()) + await _feed_raw_chunk(b"chunk01\r") + await _feed_raw_chunk(b"chunk02\r") + await _feed_raw_chunk(b"chunk03\r") + await _feed_raw_chunk(b"chunk04\r") + await _feed_raw_chunk(b"chunk05\r\n") + await _feed_raw_chunk(b"chunk06\r\n") + await _feed_raw_chunk(b"chunk07\r") + await _feed_raw_chunk(b"chunk08\r\n") + await _feed_raw_chunk(b"rpc error: ") + await _feed_raw_chunk(b"code =") + reader.feed_eof() + chunks = await task + assert chunks == [ + b"chunk01\rchunk02\rchunk03\r", + b"chunk04\r", + b"chunk05\r\n", + b"chunk06\r\n", + b"chunk07\rchunk08\r\n", + b"", + ]