Skip to content

Commit

Permalink
Readany logs (#143)
Browse files Browse the repository at this point in the history
* First stab

* Second iteration

* Added another edge-case test

* Added the key test

* Fixed failing pip

* Added missing imports
  • Loading branch information
dalazx authored Dec 20, 2019
1 parent 214420b commit 3ab56dc
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ include k8s.mk

setup:
echo "Using extra pip index: $PIP_EXTRA_INDEX_URL"
pip install --no-use-pep517 -r requirements/test.txt
pip install -r requirements/test.txt

lint:
black --check platform_monitoring tests setup.py
Expand Down
66 changes: 55 additions & 11 deletions platform_monitoring/logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io
import logging
import warnings
from typing import Any, AsyncContextManager, Dict, Optional
from typing import Any, AsyncContextManager, Dict, Optional, Tuple

import aiohttp
from aioelasticsearch import Elasticsearch
Expand Down Expand Up @@ -39,6 +39,7 @@ class FilteredStreamWrapper:
def __init__(self, stream: aiohttp.StreamReader) -> None:
self._stream = stream
self._buffer = LogBuffer()
self._is_line_start = True

def close(self) -> None:
self._buffer.close()
Expand All @@ -48,30 +49,73 @@ async def read(self, size: int = -1) -> bytes:
if chunk:
return chunk

chunk = await self._readline()
chunk = await self._read()

self._buffer.write(chunk)
return self._buffer.read(size)

async def _readline(self) -> bytes:
line = await self._stream.readline()
async def _read(self) -> bytes:
# https://github.com/neuromation/platform-api/issues/131
# k8s API (and the underlying docker API) sometimes returns an rpc
# error as the last log line. it says that the corresponding container
# does not exist. we should try to not expose such internals, but only
# if it is the last line indeed.
if line.startswith(b"rpc error: code ="):
next_line = await self._stream.readline()
if next_line:
error_prefix = b"rpc error: code ="
chunk, is_line_start = await self._read_chunk(min_line_length=len(error_prefix))
# 1. `chunk` may not be a whole line, ending with "\n";
# 2. `chunk` may be the beginning of a line with the min length of
# `len(error_prefix)`.
if is_line_start and chunk.startswith(error_prefix):
self._unreadline(chunk)
line = await self._readline()
next_chunk, _ = await self._read_chunk(min_line_length=1)
if next_chunk:
logging.warning("An rpc error line was not at the end of the log")
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
self._stream.unread_data(next_line)
chunk = line
self._unreadline(next_chunk)
else:
logging.info("Skipping an rpc error line at the end of the log")
line = next_line
chunk = next_chunk # b""
return chunk

async def _read_chunk(self, *, min_line_length: int) -> Tuple[bytes, bool]:
chunk = io.BytesIO()
is_line_start = self._is_line_start

while chunk.tell() < min_line_length:
data = await self._stream.readany()
if not data:
break

n_pos = data.find(b"\n") + 1
self._is_line_start = bool(n_pos)
if n_pos:
line, tail = data[:n_pos], data[n_pos:]
if tail:
self._unreadline(tail)
chunk.write(line)
break

chunk.write(data)
if not is_line_start:
# if this chunk is somewhere in the middle of the line, we
# want to return immediately without waiting for the rest of
# `min_chunk_length`
break

return chunk.getvalue(), is_line_start

async def _readline(self) -> bytes:
line = await self._stream.readline()
self._is_line_start = True
return line

def _unreadline(self, data: bytes) -> None:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
self._stream.unread_data(data)
self._is_line_start = True


class PodContainerLogReader(LogReader):
def __init__(
Expand Down
60 changes: 59 additions & 1 deletion tests/unit/test_kube.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict
import asyncio
from typing import Any, Dict, List
from unittest import mock

import aiohttp
Expand Down Expand Up @@ -221,3 +222,60 @@ async def test_filtered_two_rpc_errors(self) -> None:
assert chunk == b"rpc error: code = whatever\n"
chunk = await stream.read()
assert not chunk

@pytest.mark.asyncio
async def test_not_filtered_single_rpc_not_eof(self) -> None:
reader = aiohttp.StreamReader(mock.Mock(_reading_paused=False))
reader.feed_data(b"line1\n")
reader.feed_data(b"rpc error: code = whatever\n")
reader.feed_data(b"line2\n")
reader.feed_eof()
stream = FilteredStreamWrapper(reader)
chunk = await stream.read()
assert chunk == b"line1\n"
chunk = await stream.read()
assert chunk == b"rpc error: code = whatever\n"
chunk = await stream.read()
assert chunk == b"line2\n"
chunk = await stream.read()
assert not chunk

@pytest.mark.asyncio
async def test_min_line_chunk(self) -> None:
reader = aiohttp.StreamReader(mock.Mock(_reading_paused=False))
stream = FilteredStreamWrapper(reader)

async def _read_all() -> List[bytes]:
chunks: List[bytes] = []
while True:
c = await stream.read()
chunks.append(c)
if not c:
break
return chunks

async def _feed_raw_chunk(data: bytes) -> None:
reader.feed_data(data)
await asyncio.sleep(0.0)

task = asyncio.create_task(_read_all())
await _feed_raw_chunk(b"chunk01\r")
await _feed_raw_chunk(b"chunk02\r")
await _feed_raw_chunk(b"chunk03\r")
await _feed_raw_chunk(b"chunk04\r")
await _feed_raw_chunk(b"chunk05\r\n")
await _feed_raw_chunk(b"chunk06\r\n")
await _feed_raw_chunk(b"chunk07\r")
await _feed_raw_chunk(b"chunk08\r\n")
await _feed_raw_chunk(b"rpc error: ")
await _feed_raw_chunk(b"code =")
reader.feed_eof()
chunks = await task
assert chunks == [
b"chunk01\rchunk02\rchunk03\r",
b"chunk04\r",
b"chunk05\r\n",
b"chunk06\r\n",
b"chunk07\rchunk08\r\n",
b"",
]

0 comments on commit 3ab56dc

Please sign in to comment.