Skip to content

Commit

Permalink
Retry partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
iurisilvio committed Jul 11, 2021
1 parent 2c9ddb8 commit 2cccdae
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include README.md LICENSE
24 changes: 17 additions & 7 deletions bulk_task/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,28 @@ def bulk_task(self, func):
return _FuncWrapper(func, model, self)

def bulk_call(self, func, jobs):
func([job.args.as_model() for job in jobs])
try:
func([job.args.as_model() for job in jobs])
except Exception:
size = len(jobs)

# This is the broken job.
if size == 1:
self.capture_exception()
for job in jobs:
self.enqueue(job)
else:
mid = size // 2
jobs_bisect = jobs[:mid], jobs[mid:]
for part in jobs_bisect:
if part:
self.bulk_call(func, part)

def consume(self, quantity=500):
jobs = self.dequeue(quantity)
grouped = group_by(jobs, key=operator.attrgetter('func'))
for func, grouped_jobs in grouped.items():
try:
self.bulk_call(func, grouped_jobs)
except Exception:
self.capture_exception()
for job in jobs:
self.enqueue(job)
self.bulk_call(func, grouped_jobs)

def __call__(self, func):
return self.bulk_task(func)
Expand Down
14 changes: 14 additions & 0 deletions bulk_task/tests/test_bulk_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ def echo2(args):
pass


def failing(args):
for arg in args:
if arg.name == 'error':
raise Exception(arg.name)


@pytest.fixture
def job(args):
return Job(echo, args)
Expand Down Expand Up @@ -162,3 +168,11 @@ def func(args: List[PydanticModel]):
func.push(a='example')

assert len(bulk_task.queue) == 1


def test_lazy_batch_bisect_errors(bulk_task):
bulk_task.enqueue(Job(failing, Args(DataclassModel, ('walison',))))
bulk_task.enqueue(Job(failing, Args(DataclassModel, ('error',))))
bulk_task.enqueue(Job(failing, Args(DataclassModel, ('filipe',))))
bulk_task.consume()
assert len(bulk_task.queue) == 1

0 comments on commit 2cccdae

Please sign in to comment.