Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#151: Add option resolve_hostnames #152

Merged
merged 20 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ PyEXASOL provides API to read & write multiple data streams in parallel using se
- [DB-API 2.0 compatibility](/docs/DBAPI_COMPAT.md)
- [Optional dependencies](/docs/DEPENDENCIES.md)
- [Changelog](/CHANGELOG.md)
- [Developer Guide](/docs/DEVELOPER_GUIDE.md)


## PyEXASOL main concepts
Expand Down Expand Up @@ -116,4 +117,3 @@ Enjoy!

## Maintained by
[Exasol](https://www.exasol.com) 2023 — Today

27 changes: 27 additions & 0 deletions docs/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Developer Guide

This guide explains how to develop pytest and run tests.
kaklakariada marked this conversation as resolved.
Show resolved Hide resolved

## Initial Setup

Create a virtual environment and install dependencies:

```sh
poetry install --all-extras
```

Run the following to enter the virtual environment:

```sh
poetry shell
```

## Running Integration Tests

To run integration tests first start a local database:

```sh
nox -s db-start
```

Then you can run tests as usual with `pytest`.
11 changes: 11 additions & 0 deletions docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Open new connection and return `ExaConnection` object.
| `udf_output_connect_address` | `('udf_host', 8580)` | Specific SCRIPT_OUTPUT_ADDRESS value to connect from Exasol to UDF script output server (Default: inherited from TCP server) |
| `udf_output_dir` | `/tmp` | Path or path-like object pointing to directory for script output log files (Default: `tempfile.gettempdir()`) |
| `http_proxy` | `http://myproxy.com:3128` | HTTP proxy string in Linux [`http_proxy`](https://www.shellhacks.com/linux-proxy-server-settings-set-proxy-command-line/) format (Default: `None`) |
| `resolve_hostnames` | `False` | Explicitly resolve host names to IP addresses before connecting. Deactivating this will let the operating system resolve the host name (default: `True`) |
kaklakariada marked this conversation as resolved.
Show resolved Hide resolved
| `client_name` | `MyClient` | Custom name of client application displayed in Exasol sessions tables (Default: `PyEXASOL`) |
| `client_version` | `1.0.0` | Custom version of client application (Default: `pyexasol.__version__`) |
| `client_os_username` | `john` | Custom OS username displayed in Exasol sessions table (Default: `getpass.getuser()`) |
Expand All @@ -122,6 +123,16 @@ Open new connection and return `ExaConnection` object.
| `access_token` | `...` | OpenID access token to use for the login process |
| `refresh_token` | `...` | OpenID refresh token to use for the login process |

### Host Name Resolution

By default pyexasol resolves host names to IP addresses, randomly shuffles the IP addresses and tries to connect until connection succeeds. This has the following reasons:

1. If at least one hostname is not available, you always get an exception. Otherwise you will get an exception only when randomly choosing a broken hostname, which leads to random errors in production.
2. With a large cluster with ever growing number of nodes, it makes sense to put all nodes on one hostname, like `myexasol.mlan` instead of having separate host names `myexasol1..64.mlan`. In this case redundancy will not work properly if hostname is not resolved beforehand, since we do not know if it points to one address or to multiple addresses.
3. For redundancy we do not want to try the same IP address twice. This can only be guaranteed if we connect by IP.

If host name resolution causes problems, you can deactivate it by specifying argument `resolve_hostnames=False`. This may be required when connecting through a proxy that allows connections only to defined host names. In all other cases we recommend to omit the argument.

kaklakariada marked this conversation as resolved.
Show resolved Hide resolved
## connect_local_config()
Open new connection and return `ExaConnection` object using local .ini file (usually `~/.pyexasol.ini`) to read credentials and connection parameters. Please read [local config](/docs/LOCAL_CONFIG.md) page for more details.

Expand Down
61 changes: 42 additions & 19 deletions pyexasol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

from . import callback as cb

from typing import (
NamedTuple,
Optional
)
from .exceptions import *
from .statement import ExaStatement
from .logger import ExaLogger
Expand All @@ -27,6 +31,13 @@
from .version import __version__


class Host(NamedTuple):
"""This represents a resolved host name with its IP address and port number."""
hostname: str
ip_address: str
port: int
fingerprint: Optional[str]

class ExaConnection(object):
cls_statement = ExaStatement
cls_formatter = ExaFormatter
Expand Down Expand Up @@ -69,6 +80,7 @@ def __init__(self
, udf_output_connect_address=None
, udf_output_dir=None
, http_proxy=None
, resolve_hostnames=True
, client_name=None
, client_version=None
, client_os_username=None
Expand Down Expand Up @@ -104,6 +116,7 @@ def __init__(self
:param udf_output_connect_address: Specific SCRIPT_OUTPUT_ADDRESS value to connect from Exasol to UDF script output server (default: inherited from TCP server)
:param udf_output_dir: Directory to store captured UDF script output logs, split by <session_id>_<statement_id>/<vm_num>
:param http_proxy: HTTP proxy string in Linux http_proxy format (default: None)
:param resolve_hostnames: Explicitly resolve host names to IP addresses before connecting. Deactivating this will let the operating system resolve the host name (default: True)
:param client_name: Custom name of client application displayed in Exasol sessions tables (Default: PyEXASOL)
:param client_version: Custom version of client application (Default: pyexasol.__version__)
:param client_os_username: Custom OS username displayed in Exasol sessions table (Default: getpass.getuser())
Expand Down Expand Up @@ -144,6 +157,7 @@ def __init__(self
'udf_output_dir': udf_output_dir,

'http_proxy': http_proxy,
'resolve_hostnames': resolve_hostnames,

'client_name': client_name,
'client_version': client_version,
Expand Down Expand Up @@ -652,26 +666,16 @@ def _init_ws(self):
"""
dsn_items = self._process_dsn(self.options['dsn'])
failed_attempts = 0

ws_prefix = 'wss://' if self.options['encryption'] else 'ws://'
ws_options = self._get_ws_options()

for hostname, ipaddr, port, fingerprint in dsn_items:
self.logger.debug(f"Connection attempt [{ipaddr}:{port}]")

# Use correct hostname matching IP address for each connection attempt
if self.options['encryption']:
ws_options['sslopt']['server_hostname'] = hostname

try:
self._ws = websocket.create_connection(f'{ws_prefix}{ipaddr}:{port}', **ws_options)
self._ws = self._create_websocket_connection(hostname, ipaddr, port)
except Exception as e:
self.logger.debug(f'Failed to connect [{ipaddr}:{port}]: {e}')

failed_attempts += 1

if failed_attempts == len(dsn_items):
raise ExaConnectionFailedError(self, 'Could not connect to Exasol: ' + str(e))
raise ExaConnectionFailedError(self, 'Could not connect to Exasol: ' + str(e)) from e
else:
self._ws.settimeout(self.options['socket_timeout'])

Expand All @@ -686,6 +690,25 @@ def _init_ws(self):

return

def _create_websocket_connection(self, hostname:str, ipaddr:str, port:int) -> websocket.WebSocket:
ws_options = self._get_ws_options()
# Use correct hostname matching IP address for each connection attempt
if self.options['encryption'] and self.options["resolve_hostnames"]:
ws_options['sslopt']['server_hostname'] = hostname

connection_string = self._get_websocket_connection_string(hostname, ipaddr, port)
self.logger.debug(f"Connection attempt {connection_string}")
return websocket.create_connection(connection_string, **ws_options)


def _get_websocket_connection_string(self, hostname:str, ipaddr:str, port:int) -> str:
host = ipaddr if self.options["resolve_hostnames"] else hostname
if self.options["encryption"]:
return f"wss://{host}:{port}"
else:
return f"ws://{host}:{port}"
kaklakariada marked this conversation as resolved.
Show resolved Hide resolved
kaklakariada marked this conversation as resolved.
Show resolved Hide resolved


def _get_ws_options(self):
options = {
'timeout': self.options['connection_timeout'],
Expand Down Expand Up @@ -729,13 +752,13 @@ def _get_login_attributes(self):

return attributes

def _process_dsn(self, dsn):
def _process_dsn(self, dsn: str) -> list[Host]:
"""
Parse DSN, expand ranges and resolve IP addresses for all hostnames
Return list of (hostname, ip_address, port) tuples in random order
Randomness is required to guarantee proper distribution of workload across all nodes
"""
kaklakariada marked this conversation as resolved.
Show resolved Hide resolved
if len(dsn.strip()) == 0:
if dsn is None or len(dsn.strip()) == 0:
raise ExaConnectionDsnError(self, 'Connection string is empty')

current_port = constant.DEFAULT_PORT
Expand Down Expand Up @@ -793,18 +816,18 @@ def _process_dsn(self, dsn):

return result

def _resolve_hostname(self, hostname, port, fingerprint):
def _resolve_hostname(self, hostname: str, port: int, fingerprint: Optional[str]) -> list[Host]:
kaklakariada marked this conversation as resolved.
Show resolved Hide resolved
"""
Resolve all IP addresses for hostname and add port
It also implicitly checks that all hostnames mentioned in DSN can be resolved
"""
try:
hostname, alias_list, ipaddr_list = socket.gethostbyname_ex(hostname)
except OSError:
hostname, _, ipaddr_list = socket.gethostbyname_ex(hostname)
except OSError as e:
raise ExaConnectionDsnError(self, f'Could not resolve IP address of hostname [{hostname}] '
f'derived from connection string')
f'derived from connection string') from e

return [(hostname, ipaddr, port, fingerprint) for ipaddr in ipaddr_list]
return [Host(hostname, ipaddr, port, fingerprint) for ipaddr in ipaddr_list]

def _validate_fingerprint(self, provided_fingerprint):
server_fingerprint = hashlib.sha256(self._ws.sock.getpeercert(True)).hexdigest().upper()
Expand Down
112 changes: 112 additions & 0 deletions test/integration/connection_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pytest
import websocket
import ssl
from unittest import mock
from dataclasses import dataclass
from typing import Optional

from pyexasol.exceptions import ExaConnectionDsnError
from pyexasol.connection import (Host, ExaConnection)

# pylint: disable=protected-access/W0212

@dataclass(frozen=True)
class ConnectionMockFixture:
connection: ExaConnection
get_hostname_mock: mock.Mock
create_websocket_connection_mock: mock.Mock

def simulate_resolve_hostname(self, host: str, ips: list[str]):
self.get_hostname_mock.return_value = (host, [], ips)

def simulate_resolve_hostnames(self, hosts: list[tuple[str, list[str], list[str]]]):
self.get_hostname_mock.side_effect = hosts

def assert_websocket_created(self, url: str, **args: dict):
self.create_websocket_connection_mock.assert_called_once_with(url, **args)

def resolve_hostname(self, hostname: str, port: int, fingerprint: Optional[str]):
return self.connection._resolve_hostname(hostname, port, fingerprint)

def process_dsn(self, dsn: str):
return self.connection._process_dsn(dsn)

def init_ws(self):
self.connection._init_ws()


@pytest.fixture
def connection_mock(connection):
org_ws = connection._ws
org_ws_send = connection._ws_send
org_ws_recv = connection._ws_recv
try:
with mock.patch("socket.gethostbyname_ex") as get_hostname_mock:
with mock.patch("websocket.create_connection") as create_websocket_connection_mock:
create_websocket_connection_mock.return_value = mock.Mock(websocket.WebSocket)
yield ConnectionMockFixture(connection, get_hostname_mock, create_websocket_connection_mock)
finally:
connection._ws = org_ws
connection._ws_send = org_ws_send
connection._ws_recv = org_ws_recv

def test_resolve_hostname(connection_mock):
connection_mock.simulate_resolve_hostname("host", ["ip1", "ip2"])
actual = connection_mock.resolve_hostname("host", 1234, "fingerprint")
expected = [("host","ip1", 1234, "fingerprint"),("host","ip2", 1234, "fingerprint")]
assert len(actual) == len(expected)
for i in range(0, len(expected)):
assert expected[i] in actual


@pytest.mark.parametrize("empty_dsn", [None, "", " ", "\t"])
def test_process_empty_dsn_fails(connection_mock, empty_dsn):
with pytest.raises(ExaConnectionDsnError, match="Connection string is empty"):
connection_mock.process_dsn(empty_dsn)

def test_process_dsn_shuffles_hosts(connection_mock):
dsn = "host1:1234,host2:4321"
def resolve_hostname(con):
connection_mock.simulate_resolve_hostnames([("host1", [], ["ip11", "ip12"]), ("host2", [], ["ip21", "ip22"])])
return tuple(con.process_dsn(dsn))
count = 100
results = {resolve_hostname(connection_mock) for _ in range(0, count)}
assert len(results) > 1

def test_process_dsn_with_fallback_to_default_port(connection_mock):
connection_mock.simulate_resolve_hostname("host1", ["ip1"])
actual = connection_mock.process_dsn("host1")
expected = [Host("host1", "ip1", 8563, None)]
assert actual == expected

def test_process_dsn_with_fingerprint(connection_mock):
connection_mock.simulate_resolve_hostname("host1", ["ip1"])
actual = connection_mock.process_dsn("host1/135a1d2dce102de866f58267521f4232153545a075dc85f8f7596f57e588a181:1234")
expected = [Host("host1", "ip1", 1234, "135A1D2DCE102DE866F58267521F4232153545A075DC85F8F7596F57E588A181")]
assert actual == expected

def test_init_ws_connects_via_ipaddress(connection_mock):
connection_mock.simulate_resolve_hostname("localhost", ["ip1"])
connection_mock.init_ws()
ssl_options = {'cert_reqs': ssl.CERT_NONE, 'server_hostname': 'localhost'}
connection_mock.assert_websocket_created("wss://ip1:8563", timeout=10, skip_utf8_validation=True, enable_multithread=True, sslopt=ssl_options)

def test_init_ws_connects_without_encryption(connection_mock):
connection_mock.connection.options["encryption"] = False
connection_mock.simulate_resolve_hostname("localhost", ["ip1"])
connection_mock.init_ws()
connection_mock.assert_websocket_created("ws://ip1:8563", timeout=10, skip_utf8_validation=True, enable_multithread=True)

def test_init_ws_connects_without_encryption_via_hostname(connection_mock):
connection_mock.connection.options["encryption"] = False
connection_mock.connection.options["resolve_hostnames"] = False
connection_mock.simulate_resolve_hostname("localhost", ["ip1"])
connection_mock.init_ws()
connection_mock.assert_websocket_created("ws://localhost:8563", timeout=10, skip_utf8_validation=True, enable_multithread=True)

def test_init_ws_connects_via_hostname(connection_mock):
connection_mock.connection.options["resolve_hostnames"] = False
connection_mock.simulate_resolve_hostname("localhost", ["ip1"])
connection_mock.init_ws()
ssl_options = {'cert_reqs': ssl.CERT_NONE}
connection_mock.assert_websocket_created("wss://localhost:8563", timeout=10, skip_utf8_validation=True, enable_multithread=True, sslopt=ssl_options)
19 changes: 19 additions & 0 deletions test/integration/export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ def connection(dsn, user, password, schema):
yield con


@pytest.fixture
def connection_without_resolving_hostnames(dsn, user, password, schema):
with pyexasol.connect(
dsn=dsn, user=user, password=password, schema=schema, compression=True, resolve_hostnames=False
) as con:
yield con


@pytest.fixture
def table_name():
yield "CLIENT_NAMES"
Expand Down Expand Up @@ -92,6 +100,17 @@ def test_export_with_column_names(connection, table, data, export_file, expected
assert actual == expected


@pytest.mark.etl
def test_export_without_resolving_hostname(connection_without_resolving_hostnames, table, data, export_file, expected_csv):
params = {"with_column_names": True}
connection_without_resolving_hostnames.export_to_file(export_file, table, export_params=params)

expected = expected_csv(table, data, **params)
actual = export_file.read_text()

assert actual == expected


@pytest.mark.etl
def test_custom_export_callback(connection, table, data, export_file, expected_csv):
def export_cb(pipe, dst):
Expand Down
19 changes: 19 additions & 0 deletions test/integration/import_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ def connection(dsn, user, password, schema):
yield con


@pytest.fixture
def connection_without_resolving_hostnames(dsn, user, password, schema):
with pyexasol.connect(
dsn=dsn, user=user, password=password, schema=schema, compression=True, resolve_hostnames=False
) as con:
yield con


kaklakariada marked this conversation as resolved.
Show resolved Hide resolved
@pytest.fixture
def table_name():
yield "CLIENT_NAMES"
Expand Down Expand Up @@ -67,6 +75,17 @@ def test_import_csv(connection, empty_table, csv_file, data):
assert actual == expected


@pytest.mark.etl
def test_import_without_resolving_hostname(connection_without_resolving_hostnames, empty_table, csv_file, data):
connection_without_resolving_hostnames.import_from_file(csv_file, empty_table)
result = connection_without_resolving_hostnames.execute(f"SELECT * FROM {empty_table};")

expected = set(data)
actual = set(result.fetchall())

assert actual == expected


@pytest.mark.etl
def test_import_with_reordered_columns(connection, empty_table, csv_file, swaped_data):
params = {"columns": ["LASTNAME", "FIRSTNAME"]}
Expand Down
Loading
Loading