Skip to content

Commit

Permalink
Align pool_maxsize for different connection pool implementations. (#535)
Browse files Browse the repository at this point in the history
* Align pool_maxsize for different connection pool implementations.

Signed-off-by: dblock <[email protected]>

* Document connection classes and settings.

Signed-off-by: dblock <[email protected]>

* Undo change in async for backwards compatibility.

Signed-off-by: dblock <[email protected]>

* Fix: typo.

Signed-off-by: dblock <[email protected]>

---------

Signed-off-by: dblock <[email protected]>
  • Loading branch information
dblock authored Oct 12, 2023
1 parent 62b408b commit 7a638cd
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added generating imports and headers to API generator ([#467](https://github.com/opensearch-project/opensearch-py/pull/467))
- Added point-in-time APIs (create_pit, delete_pit, delete_all_pits, get_all_pits) and Security Client APIs (health and update_audit_configuration) ([#502](https://github.com/opensearch-project/opensearch-py/pull/502))
- Added new guide for using index templates with the client ([#531](https://github.com/opensearch-project/opensearch-py/pull/531))
- Added `pool_maxsize` for `Urllib3HttpConnection` ([#535](https://github.com/opensearch-project/opensearch-py/pull/535))
### Changed
- Generate `tasks` client from API specs ([#508](https://github.com/opensearch-project/opensearch-py/pull/508))
- Generate `ingest` client from API specs ([#513](https://github.com/opensearch-project/opensearch-py/pull/513))
Expand Down
1 change: 1 addition & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ print(response)
- [Point in Time](guides/point_in_time.md)
- [Using a Proxy](guides/proxy.md)
- [Index Templates](guides/index_template.md)
- [Connection Classes](guides/connection_classes.md)

## Plugins

Expand Down
1 change: 0 additions & 1 deletion guides/auth.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ client = OpenSearch(
['htps://...'],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
http_auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL)
)

Expand Down
81 changes: 81 additions & 0 deletions guides/connection_classes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
- [Connection Classes](#connection-classes)
- [Selecting a Connection Class](#selecting-a-connection-class)
- [Urllib3HttpConnection](#urllib3httpconnection)
- [RequestsHttpConnection](#requestshttpconnection)
- [AsyncHttpConnection](#asynchttpconnection)
- [Connection Pooling](#connection-pooling)

# Connection Classes

The OpenSearch Python synchronous client supports both the `Urllib3HttpConnection` connection class (default) from the [urllib3](https://pypi.org/project/urllib3/) library, and `RequestsHttpConnection` from the [requests](https://pypi.org/project/requests/) library. We recommend you use the default, unless your application is standardized on `requests`.

The faster, asynchronous client, implements a class called `AsyncHttpConnection`, which uses [aiohttp](https://pypi.org/project/aiohttp/).

## Selecting a Connection Class

### Urllib3HttpConnection

```python
from opensearchpy import OpenSearch, Urllib3HttpConnection

client = OpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_auth = ('admin', 'admin'),
use_ssl = True,
verify_certs = False,
ssl_show_warn = False,
connection_class = Urllib3HttpConnection
)
```

### RequestsHttpConnection

```python
from opensearchpy import OpenSearch, RequestsHttpConnection

client = OpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_auth = ('admin', 'admin'),
use_ssl = True,
verify_certs = False,
ssl_show_warn = False,
connection_class = RequestsHttpConnection
)
```

### AsyncHttpConnection

```python
from opensearchpy import AsyncOpenSearch, AsyncHttpConnection

async def main():
client = AsyncOpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_auth = ('admin', 'admin'),
use_ssl = True,
verify_certs = False,
ssl_show_warn = False,
connection_class = AsyncHttpConnection
)
```

## Connection Pooling

The OpenSearch Python client has a connection pool for each `host` value specified during initialization, and a connection pool for HTTP connections to each host implemented in the underlying HTTP libraries. You can adjust the max size of the latter connection pool with `pool_maxsize`.

If you don't set this value, each connection library implementation will provide its default, which is typically `10`. Changing the pool size may improve performance in some multithreaded scenarios.

The following example sets the number of connections in the connection pool to 12.

```python
from opensearchpy import OpenSearch

client = OpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_auth = ('admin', 'admin'),
use_ssl = True,
verify_certs = False,
ssl_show_warn = False,
pool_maxsize = 12,
)
```
1 change: 0 additions & 1 deletion guides/proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ OpenSearch(
hosts=["htps://..."],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
trust_env=True,
)
```
Expand Down
9 changes: 6 additions & 3 deletions opensearchpy/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Urllib3HttpConnection(Connection):
``ssl`` module for exact options for your environment).
:arg ssl_assert_hostname: use hostname verification if not `False`
:arg ssl_assert_fingerprint: verify the supplied certificate fingerprint if not `None`
:arg maxsize: the number of connections which will be kept open to this
:arg pool_maxsize: the number of connections which will be kept open to this
host. See https://urllib3.readthedocs.io/en/1.4/pools.html#api for more
information.
:arg headers: any custom http headers to be add to requests
Expand All @@ -109,7 +109,7 @@ def __init__(
ssl_version=None,
ssl_assert_hostname=None,
ssl_assert_fingerprint=None,
maxsize=10,
pool_maxsize=None,
headers=None,
ssl_context=None,
http_compress=None,
Expand Down Expand Up @@ -203,8 +203,11 @@ def __init__(
if not ssl_show_warn:
urllib3.disable_warnings()

if pool_maxsize and isinstance(pool_maxsize, int):
kw["maxsize"] = pool_maxsize

self.pool = pool_class(
self.hostname, port=self.port, timeout=self.timeout, maxsize=maxsize, **kw
self.hostname, port=self.port, timeout=self.timeout, **kw
)

def perform_request(
Expand Down
6 changes: 6 additions & 0 deletions opensearchpy/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(
serializers=None,
default_mimetype="application/json",
max_retries=3,
pool_maxsize=None,
retry_on_status=(502, 503, 504),
retry_on_timeout=False,
send_get_body_as="GET",
Expand Down Expand Up @@ -120,6 +121,8 @@ def __init__(
don't support passing bodies with GET requests. If you set this to
'POST' a POST method will be used instead, if to 'source' then the body
will be serialized and passed as a query parameter `source`.
:arg pool_maxsize: Maximum connection pool size used by pool-manager
For custom connection-pooling on current session
Any extra keyword arguments will be passed to the `connection_class`
when creating and instance unless overridden by that connection's
Expand All @@ -139,6 +142,7 @@ def __init__(
self.deserializer = Deserializer(_serializers, default_mimetype)

self.max_retries = max_retries
self.pool_maxsize = pool_maxsize
self.retry_on_timeout = retry_on_timeout
self.retry_on_status = retry_on_status
self.send_get_body_as = send_get_body_as
Expand Down Expand Up @@ -211,6 +215,8 @@ def _create_connection(host):
# previously unseen params, create new connection
kwargs = self.kwargs.copy()
kwargs.update(host)
if self.pool_maxsize and isinstance(self.pool_maxsize, int):
kwargs["pool_maxsize"] = self.pool_maxsize
return self.connection_class(**kwargs)

connections = map(_create_connection, hosts)
Expand Down
32 changes: 32 additions & 0 deletions test_opensearchpy/test_client/test_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from unittest import TestCase

from opensearchpy import OpenSearch, RequestsHttpConnection


class TestRequests(TestCase):
def test_connection_class(self):
client = OpenSearch(connection_class=RequestsHttpConnection)
self.assertEqual(client.transport.pool_maxsize, None)
self.assertEqual(client.transport.connection_class, RequestsHttpConnection)
self.assertIsInstance(
client.transport.connection_pool.connections[0], RequestsHttpConnection
)

def test_pool_maxsize(self):
client = OpenSearch(connection_class=RequestsHttpConnection, pool_maxsize=42)
self.assertEqual(client.transport.pool_maxsize, 42)
self.assertEqual(
client.transport.connection_pool.connections[0]
.session.adapters["https://"]
._pool_maxsize,
42,
)
39 changes: 39 additions & 0 deletions test_opensearchpy/test_client/test_urllib3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from unittest import TestCase

from urllib3.connectionpool import HTTPConnectionPool

from opensearchpy import OpenSearch, Urllib3HttpConnection


class TestUrlLib3(TestCase):
def test_default(self):
client = OpenSearch()
self.assertEqual(client.transport.connection_class, Urllib3HttpConnection)
self.assertEqual(client.transport.pool_maxsize, None)

def test_connection_class(self):
client = OpenSearch(connection_class=Urllib3HttpConnection)
self.assertEqual(client.transport.connection_class, Urllib3HttpConnection)
self.assertIsInstance(
client.transport.connection_pool.connections[0], Urllib3HttpConnection
)
self.assertIsInstance(
client.transport.connection_pool.connections[0].pool, HTTPConnectionPool
)

def test_pool_maxsize(self):
client = OpenSearch(connection_class=Urllib3HttpConnection, pool_maxsize=42)
self.assertEqual(client.transport.pool_maxsize, 42)
# https://github.com/python/cpython/blob/3.12/Lib/queue.py#L35
self.assertEqual(
client.transport.connection_pool.connections[0].pool.pool.maxsize, 42
)

0 comments on commit 7a638cd

Please sign in to comment.