Skip to content

Commit

Permalink
ML-1325: Commit stream offsets using new nuclio API (#428)
Browse files Browse the repository at this point in the history
* WIP ML-1325: Commit stream offsets using new nuclio API

* WIP explicit ack

* lint

* Fix `AsyncEmitSource` implementation of offset committer

* Fix `AsyncEmitSource` tests and reduce sleep in implementation

* Keep original events in `StreamTarget` and `Table`

* Support flush and "explicit ack" in `KafkaTarget`

* Preserve original events in `Batch`

* Fixes

* Fix for KafkaTarget
* Fix for `if last_handled_offset:`
* Fix for committer call (need to add nuclio-sdk as storey dependency)
* No need for two `del`s in `StreamTarget`

* Fixes

* collect gc

* Add explicit_ack flag to allow turning the feature off even if running in nuclio

* Fix tests

---------

Co-authored-by: Gal Topper <[email protected]>
  • Loading branch information
gtopper and Gal Topper authored Jul 18, 2023
1 parent 5f4b4ad commit 69276d9
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 20 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ numpy>=1.16.5,<1.23
pyarrow>=1,<12
v3io-frames~=0.10.3
v3iofs~=0.1.9
xxhash>=1
xxhash>=1
nuclio-sdk>=0.5.3
2 changes: 1 addition & 1 deletion storey/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async def _do(self, event):

safe_key = stringify_key(key)
await self._table._lazy_load_key_with_aggregates(safe_key, event_timestamp)
await self._table._aggregate(safe_key, element, event_timestamp)
await self._table._aggregate(safe_key, event, element, event_timestamp)

if isinstance(self._emit_policy, EmitEveryEvent):
await self._emit_event(key, event)
Expand Down
17 changes: 14 additions & 3 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,12 @@ async def _do_downstream(self, event):
if len(self._outlets) > 1:
awaitable_result = event._awaitable_result
event._awaitable_result = None
original_events = event._original_events
event._original_events = None
for i in range(1, len(self._outlets)):
event_copy = copy.deepcopy(event)
event_copy._awaitable_result = awaitable_result
event._original_events = original_events
tasks.append(asyncio.get_running_loop().create_task(self._outlets[i]._do_and_recover(event_copy)))
event._awaitable_result = awaitable_result
if self.verbose and self.logger:
Expand Down Expand Up @@ -550,6 +553,7 @@ async def _call(self, event):
res, new_state = self._fn(element, key_data)
async with self._state._get_lock(safe_key):
self._state._update_static_attrs(safe_key, new_state)
self._state._pending_events.append(event)
self._state._init_flush_task()
else:
res, self._state = self._fn(element, self._state)
Expand Down Expand Up @@ -976,6 +980,8 @@ def __init__(
def _init(self):
super()._init()
self._batch: Dict[Optional[str], List[Any]] = defaultdict(list)
# Keep the original events that make up each batch
self._batch_events: Dict[Optional[str], List[Any]] = defaultdict(list)
self._batch_first_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_last_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_start_time: Dict[Optional[str], float] = {}
Expand All @@ -995,7 +1001,7 @@ def _create_key_extractor(key_field) -> Callable:
else:
raise ValueError(f"Unsupported key_field type {type(key_field)}")

async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None):
raise NotImplementedError

async def _terminate(self):
Expand Down Expand Up @@ -1025,6 +1031,7 @@ async def _do(self, event):
self._timeout_task = asyncio.get_running_loop().create_task(self._sleep_and_emit())

self._batch[key].append(self._event_to_batch_entry(event))
self._batch_events[key].append(event)

if len(self._batch[key]) == self._max_events:
await self._emit_batch(key)
Expand Down Expand Up @@ -1057,7 +1064,8 @@ async def _emit_batch(self, batch_key: Optional[str] = None):
batch_time = self._batch_first_event_time.pop(batch_key)
last_event_time = self._batch_last_event_time.pop(batch_key)
del self._batch_start_time[batch_key]
await self._emit(batch_to_emit, batch_key, batch_time, last_event_time)
await self._emit(batch_to_emit, batch_key, batch_time, self._batch_events[batch_key], last_event_time)
del self._batch_events[batch_key]

async def _emit_all(self):
for key in list(self._batch.keys()):
Expand All @@ -1081,8 +1089,11 @@ class Batch(_Batching):

_do_downstream_per_event = False

async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None):
event = Event(batch)
if not self._full_event:
# Preserve reference to the original events to avoid early commit of offsets
event._original_events = batch_events
return await self._do_downstream(event)


Expand Down
125 changes: 121 additions & 4 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
#
import asyncio
import copy
import gc
import queue
import threading
import uuid
import warnings
import weakref
from collections import defaultdict
from datetime import datetime, timezone
from typing import Callable, Coroutine, Iterable, List, Optional, Union

import pandas
import pandas as pd
import pyarrow
import pytz
from nuclio_sdk import QualifiedOffset

from .dtypes import Event, _termination_obj
from .flow import Complete, Flow
Expand Down Expand Up @@ -199,6 +203,7 @@ def emit(
if self._return_awaitable_result:
awaitable_result = AwaitableResult(expected_number_of_results=expected_number_of_results or 1)
event._awaitable_result = awaitable_result
event._original_events = [event]
self._emit_fn(event)
return awaitable_result

Expand All @@ -225,12 +230,26 @@ def await_termination(self):
return self._await_termination_fn()


class _EventOffset:
def __init__(self, event):
self.event_weakref = weakref.ref(event)
self.offset = event.offset

def is_ready_to_commit(self):
return self.event_weakref() is None

def __repr__(self):
return f"_EventOffset({self.offset})"


class SyncEmitSource(Flow):
"""Synchronous entry point into a flow. Produces a FlowController when run, for use from inside a synchronous
context. See AsyncEmitSource for use from inside an async context.
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource).
:type name: string
Expand All @@ -243,6 +262,8 @@ def __init__(
self,
buffer_size: Optional[int] = None,
key_field: Union[list, str, int, None] = None,
max_events_before_commit=None,
explicit_ack=False,
**kwargs,
):
if buffer_size is None:
Expand All @@ -256,23 +277,54 @@ def __init__(
raise ValueError("Buffer size must be positive")
self._q = queue.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._explicit_ack = explicit_ack
self._termination_q = queue.Queue(1)
self._ex = None
self._closeables = []

def _init(self):
super()._init()
self._is_terminated = False
self._outstanding_offsets = defaultdict(list)

async def _run_loop(self):
loop = asyncio.get_running_loop()
self._termination_future = asyncio.get_running_loop().create_future()

self._termination_future = loop.create_future()
committer = None
num_events_handled_without_commit = 0
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = await loop.run_in_executor(None, self._q.get)
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
# In case we can't block because there are outstanding events
while not can_block:
if self._q.qsize() > 0:
event = self._q.get_nowait()
if event:
break
await asyncio.sleep(1)
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
if not event:
event = await loop.run_in_executor(None, self._q.get)
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
# We can commit all at this point because termination of
# all downstream steps completed successfully.
await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True)
self._termination_future.set_result(termination_result)
except BaseException as ex:
if event is not _termination_obj and event._awaitable_result:
Expand Down Expand Up @@ -430,6 +482,7 @@ async def emit(
if self._await_result:
awaitable = AsyncAwaitableResult()
event._awaitable_result = awaitable
event._original_events = [event]
await self._emit_fn(event)
if self._await_result:
result = await awaitable.await_result()
Expand All @@ -449,12 +502,39 @@ async def await_termination(self):
return await self._loop_task


async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, commit_all=False):
all_offsets_handled = True
for qualified_shard, offsets in outstanding_offsets_by_qualified_shard.items():
if commit_all and offsets:
last_handled_offset = offsets[-1].offset
num_to_clear = len(offsets)
else:
num_to_clear = 0
last_handled_offset = None
gc.collect()
# go over offsets in the qualified shard by arrival order until we reach an unhandled offset
for offset in offsets:
if not offset.is_ready_to_commit():
all_offsets_handled = False
break
last_handled_offset = offset.offset
num_to_clear += 1
if last_handled_offset is not None:
path, shard_id = qualified_shard
await committer(QualifiedOffset(path, shard_id, last_handled_offset))
outstanding_offsets_by_qualified_shard[qualified_shard] = offsets[num_to_clear:]
return all_offsets_handled


class AsyncEmitSource(Flow):
"""
Asynchronous entry point into a flow. Produces an AsyncFlowController when run, for use from inside an async def.
See SyncEmitSource for use from inside a synchronous context.
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource).
:type name: string
Expand All @@ -467,6 +547,8 @@ def __init__(
self,
buffer_size: int = None,
key_field: Union[list, str, None] = None,
max_events_before_commit=None,
explicit_ack=False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -478,19 +560,54 @@ def __init__(
kwargs["buffer_size"] = buffer_size
self._q = asyncio.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._explicit_ack = explicit_ack
self._ex = None
self._closeables = []

def _init(self):
super()._init()
self._is_terminated = False
self._outstanding_offsets = defaultdict(list)

async def _run_loop(self):
committer = None
num_events_handled_without_commit = 0
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = await self._q.get()
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
# In case we can't block because there are outstanding events
while not can_block:
# Yield to allow the queue to fill if possible
await asyncio.sleep(0)
if not self._q.empty():
event = self._q.get_nowait()
if event:
break
# Wait to avoid busy-wait
await asyncio.sleep(0.2)
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
if not event:
event = await self._q.get()
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
# We can commit all at this point because termination of
# all downstream steps completed successfully.
await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True)
return termination_result
except BaseException as ex:
self._ex = ex
Expand Down
6 changes: 5 additions & 1 deletion storey/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(
self._terminated = False
self._flush_exception = None
self._changed_keys = set()
self._pending_events = []
self.fixed_window_type = None

def __str__(self):
Expand Down Expand Up @@ -180,7 +181,7 @@ def _init_flush_task(self):
async def close(self):
await self._storage.close()

async def _aggregate(self, key, data, timestamp):
async def _aggregate(self, key, event, data, timestamp):
if self._flush_exception is not None:
raise self._flush_exception
if not self._schema:
Expand All @@ -189,6 +190,7 @@ async def _aggregate(self, key, data, timestamp):
cache_item = self._get_aggregations_attrs(key)
await cache_item.aggregate(data, timestamp)
self._changed_keys.add(key)
self._pending_events.append(event)

async def _get_features(self, key, timestamp):
if self._flush_exception is not None:
Expand Down Expand Up @@ -390,6 +392,8 @@ async def _flush_worker(self):
if not isinstance(ex, asyncio.CancelledError):
self._flush_exception = ex

self._pending_events = []

async def _persist_worker(self):
task = None
received_job_count = 0
Expand Down
Loading

0 comments on commit 69276d9

Please sign in to comment.