From 31721632d09e6c2ee5e2d2f00e8d8560c0bee9dc Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 21 Dec 2024 15:06:46 +0100 Subject: [PATCH] Table Loader: Add capability to load InfluxDB Line Protocol (ILP) files Synopsis: ctk load table "file://influxdb-export.lp" --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 6 ++++- cratedb_toolkit/io/influxdb.py | 10 +++++++++ doc/io/influxdb/loader.md | 30 ++++++++++++++++++++----- pyproject.toml | 2 +- tests/io/influxdb/test_cli.py | 40 ++++++++++++++++++++++++++++++++++ 6 files changed, 82 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index eadd0cfb..3c1f07f5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ - Improved `ctk shell` to also talk to CrateDB standalone databases - Added basic utility command `ctk tail`, for tailing a database table, and optionally following the tail +- Table Loader: Added capability to load InfluxDB Line Protocol (ILP) files ## 2024/10/13 v0.0.29 - MongoDB: Added Zyp transformations to the CDC subsystem, diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index e51f2e36..952ffaa4 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -129,7 +129,11 @@ def load_table( logger.error("Data loading failed or incomplete") return False - elif source_url_obj.scheme.startswith("influxdb"): + elif ( + source_url_obj.scheme.startswith("influxdb") + or resource.url.endswith(".lp") + or resource.url.endswith(".lp.gz") + ): from cratedb_toolkit.io.influxdb import influxdb_copy http_scheme = "http" diff --git a/cratedb_toolkit/io/influxdb.py b/cratedb_toolkit/io/influxdb.py index 057a24b4..8eda165a 100644 --- a/cratedb_toolkit/io/influxdb.py +++ b/cratedb_toolkit/io/influxdb.py @@ -2,6 +2,8 @@ from influxio.core import copy +from cratedb_toolkit.model import DatabaseAddress + logger = logging.getLogger(__name__) @@ -12,6 +14,14 @@ def influxdb_copy(source_url, target_url, progress: bool = False): export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo ctk load table influxdb2://example:token@localhost:8086/testdrive/demo """ + + # Sanity checks. + target_address = DatabaseAddress.from_string(target_url) + url, table_address = target_address.decode() + if table_address.table is None: + raise ValueError("Table name is missing. Please adjust CrateDB database URL.") + + # Invoke copy operation. logger.info("Running InfluxDB copy") copy(source_url, target_url, progress=progress) return True diff --git a/doc/io/influxdb/loader.md b/doc/io/influxdb/loader.md index bcb48502..2f17411b 100644 --- a/doc/io/influxdb/loader.md +++ b/doc/io/influxdb/loader.md @@ -2,10 +2,14 @@ # InfluxDB Table Loader ## About -Load data from InfluxDB into CrateDB using a one-stop command -`ctk load table influxdb2://...`, in order to facilitate convenient +Load data from InfluxDB into CrateDB using one-stop commands +`ctk load table ...`, in order to facilitate convenient data transfers to be used within data pipelines or ad hoc operations. +## Synopsis +- Load from InfluxDB server: `ctk load table influxdb2://...` +- Load from InfluxDB line protocol: `ctk load table file://observations.lp` + ## Details The InfluxDB table loader is based on the [influxio] package. Please also check its documentation to learn about more of its capabilities, supporting you when @@ -18,7 +22,13 @@ pip install --upgrade 'cratedb-toolkit[influxdb]' ## Usage -### Workstation +Prepare subsequent commands by defining the database address of your +CrateDB database cluster. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +``` + +### InfluxDB 2 API An exemplary walkthrough, copying data from InfluxDB to CrateDB, both services expected to be listening on `localhost`. @@ -37,13 +47,11 @@ influx query "from(bucket:\"${INFLUX_BUCKET_NAME}\") |> range(start:-100y)" Transfer data from InfluxDB bucket/measurement into CrateDB schema/table. ```shell -export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo ctk load table influxdb2://example:token@localhost:8086/testdrive/demo ``` Query data in CrateDB. ```shell -export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` @@ -52,6 +60,18 @@ ctk show table "testdrive.demo" - More convenient table querying. ::: +### InfluxDB Line protocol file (ILP) + +Import ILP file from local filesystem. +```shell +ctk load table "file://influxdb-export.lp" +``` + +Import ILP file from a remote resource. +```shell +ctk load table \ + "https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" +``` ### Cloud diff --git a/pyproject.toml b/pyproject.toml index 129198af..42b8b766 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,7 +150,7 @@ full = [ ] influxdb = [ "cratedb-toolkit[io]", - "influxio>=0.4,<1", + "influxio>=0.5,<1", ] io = [ "cr8", diff --git a/tests/io/influxdb/test_cli.py b/tests/io/influxdb/test_cli.py index 1fa346c4..01736e7c 100644 --- a/tests/io/influxdb/test_cli.py +++ b/tests/io/influxdb/test_cli.py @@ -15,6 +15,46 @@ from influxio.adapter import InfluxDbApiAdapter +def test_line_protocol_load_table_success(caplog, cratedb, needs_sqlalchemy2): + """ + CLI test: Invoke `ctk load table` for InfluxDB line protocol (ILP) file. + """ + ilp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/noaa-ndbc-data/latest-observations.lp" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + result = runner.invoke( + cli, + args=f"load table {ilp_url}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") >= 500 + + +def test_line_protocol_load_table_wrong_cratedb_url_failure(caplog, cratedb, needs_sqlalchemy2): + """ + CLI test: Invoke `ctk load table` for InfluxDB line protocol (ILP) file. + """ + ilp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/noaa-ndbc-data/latest-observations.lp" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + with pytest.raises(ValueError) as ex: + runner.invoke( + cli, + args=f"load table {ilp_url}", + catch_exceptions=False, + ) + assert ex.match("Table name is missing. Please adjust CrateDB database URL.") + + def test_influxdb2_load_table(caplog, cratedb, influxdb, needs_sqlalchemy2): """ CLI test: Invoke `ctk load table` for InfluxDB.