Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add executor to zimscraperlib #211

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Add executor to zimscraperlib #211

wants to merge 4 commits into from

Conversation

benoit74
Copy link
Collaborator

@benoit74 benoit74 commented Nov 5, 2024

This PR enrich the scraperlib with a ScraperExecutor. This class is capable to process tasks in parallel, with a given number of worker threads.

This executor is mainly inspired from sotoki executor, even if we can find other executors in wikihow and in iFixit. wikihow one seems more primitive / ancient, and iFixit is just a pale copy.

For easy review, first commit is simply a copy/paste of sotoki code, and next commit are the adaptations / enhancement for scraperlib

What has been changed compared to sotoki code:

  • commit ae8edb7:
    • automated unit tests obviously
    • moved thread_deadline_sec to the executor, should we need to customize it per executor (probably the case, priceless and useful for tests at least)
    • added a check if self.no_more: in submit method: allows to stop accepting task even when the executor is just joined and not shutdown
    • renamed prefix to executor_name and moved from T- to executor (way more clear in the logs from my experience)
    • removed the release_halt method which was misleading / not working (I failed to join and then release_halt and then submit again ... it seems mandatory to join then start (again) then submit)
  • commit fd5c04a
    • changes in join method: in sotoki, the executor wait thread_deadline_sec seconds per thread. This is highly unpredictable when there are many workers (we could wait thread_deadline_sec for first worker, then thread_deadline_sec for second worker, etc ...), and it is a bit weird that last worker in the list has way more time to complete than first one
    • new method computes a global deadline for all threads to join, and immediately request all of them to join (should they already be ready to join)
  • commit 0ce636c
    • just a standard log displaying the thread name, useful to use same notation / format in all scrapers even when we want to display the thread name (should be quite common in fact)

This executor will be used right now in mindtouch scraper.

@benoit74 benoit74 self-assigned this Nov 5, 2024
Copy link

codecov bot commented Nov 5, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.00%. Comparing base (ea6505f) to head (0ce636c).

Additional details and impacted files
@@            Coverage Diff             @@
##              main      #211    +/-   ##
==========================================
  Coverage   100.00%   100.00%            
==========================================
  Files           38        39     +1     
  Lines         2221      2327   +106     
  Branches       426       446    +20     
==========================================
+ Hits          2221      2327   +106     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@benoit74 benoit74 marked this pull request as ready for review November 5, 2024 14:31
@benoit74 benoit74 requested a review from rgaudin November 5, 2024 14:31
Copy link
Member

@rgaudin rgaudin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but it lacks proper documentation. It's important the expected behavior is clearly documented so the user can make informed choices


@property
def exception(self):
"""Exception raises in any thread, if any"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Exception raises in any thread, if any"""
"""Exception raised in any thread, if any"""

self._workers: set[threading.Thread] = set()
self.no_more = False
self._shutdown = False
self.exceptions[:] = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.exceptions[:] = []
self.exceptions.clear()

self._shutdown = False
self.exceptions[:] = []

for n in range(self.nb_workers):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No single-letter variable

try:
func(**kwargs)
except Exception as exc:
logger.error(f"Error processing {func} with {kwargs=}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error(f"Error processing {func} with {kwargs=}")
logger.error(f"Error processing function {func.__name__} with {kwargs=}")

# received None from the queue. most likely shuting down
return

raises = kwargs.pop("raises") if "raises" in kwargs.keys() else False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those things are now part of the submit API and should be documented there (in its docstring)

logger.error(f"Error processing {func} with {kwargs=}")
logger.exception(exc)
if raises: # to cover when raises = False
self.exceptions.append(exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended behavior? Exceptions are swallowed without a trace (when using raise=False)? If so, this must be clearly documented. Not raising but storing the exception is another valid alternative.

except queue.Empty:
break

def join(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the recommended way to await an executor completion then?
With this, the deadline_sec being mandatory, I can only use join when I want to exit but in a scraper, I imagine the scenario being: I collect all my resources and submit them to the executor then I join and once everything has been processed (once join completes), I can continue.

This is the regular meaning of join.

If we want it to act as a properly exit method, then the user has to track progress manually and this should be clearly specified in the documentation of the executor

while (
len(alive_threads) > 0 and datetime.datetime.now(tz=datetime.UTC) < deadline
):
e = threading.Event()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single-letter var

@benoit74 benoit74 marked this pull request as draft November 8, 2024 13:06
@benoit74
Copy link
Collaborator Author

benoit74 commented Nov 8, 2024

Converting to draft, we are experimenting with joblib in mindtouch scraper for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants