Skip to content

Commit

Permalink
fix: dispose window scheduler (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Feb 29, 2024
1 parent ed89701 commit 2661e4d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

### Bug Fixes
1. [#636](https://github.com/influxdata/influxdb-client-python/pull/636): Handle missing data in data frames
2. [#638](https://github.com/influxdata/influxdb-client-python/pull/638), [#642](https://github.com/influxdata/influxdb-client-python/pull/642): Refactor DataFrame operations to avoid chained assignment and resolve FutureWarning in pandas, ensuring compatibility with pandas 3.0.
1. [#638](https://github.com/influxdata/influxdb-client-python/pull/638), [#642](https://github.com/influxdata/influxdb-client-python/pull/642): Refactor DataFrame operations to avoid chained assignment and resolve FutureWarning in pandas, ensuring compatibility with pandas 3.0.
1. [#641](https://github.com/influxdata/influxdb-client-python/pull/641): Correctly dispose ThreadPoolScheduler in WriteApi

### Documentation
1. [#639](https://github.com/influxdata/influxdb-client-python/pull/639): Use Markdown for `README`
Expand Down
9 changes: 8 additions & 1 deletion influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,18 @@ def __init__(self,
self._success_callback = kwargs.get('success_callback', None)
self._error_callback = kwargs.get('error_callback', None)
self._retry_callback = kwargs.get('retry_callback', None)
self._window_scheduler = None

if self._write_options.write_type is WriteType.batching:
# Define Subject that listen incoming data and produces writes into InfluxDB
self._subject = Subject()

self._window_scheduler = ThreadPoolScheduler(1)
self._disposable = self._subject.pipe(
# Split incoming data to windows by batch_size or flush_interval
ops.window_with_time_or_count(count=write_options.batch_size,
timespan=timedelta(milliseconds=write_options.flush_interval),
scheduler=ThreadPoolScheduler(1)),
scheduler=self._window_scheduler),
# Map window into groups defined by 'organization', 'bucket' and 'precision'
ops.flat_map(lambda window: window.pipe(
# Group window by 'organization', 'bucket' and 'precision'
Expand Down Expand Up @@ -440,6 +442,10 @@ def __del__(self):
)
break

if self._window_scheduler:
self._window_scheduler.executor.shutdown(wait=False)
self._window_scheduler = None

if self._disposable:
self._disposable = None
pass
Expand Down Expand Up @@ -565,6 +571,7 @@ def __getstate__(self):
# Remove rx
del state['_subject']
del state['_disposable']
del state['_window_scheduler']
del state['_write_service']
return state

Expand Down

0 comments on commit 2661e4d

Please sign in to comment.