Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

fix: InfluxDB client read/write timeout settings #163

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## v1.29.0 [unreleased]

### Bug Fixes
1. [#163](https://github.com/influxdata/nifi-influxdb-bundle/pull/163): Max connection timeout also used as read/write timeout.

## v1.28.0 [2024-03-01]

### Others
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Allows sharing connection configuration to InfluxDB 1.x among more NiFi process
| SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections |
| Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. |
| **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB |
| Username | Username which is used to authorize against the InfluxDB |
| Password | Password for the username which is used to authorize against the InfluxDB. If the authorization fail the FlowFile will be penalized and routed to 'retry' relationship. |

Expand All @@ -284,7 +284,7 @@ Allows sharing connection configuration to InfluxDB 2.0 among more NiFi processo
| SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections |
| Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. |
| **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB |
| **InfluxDB Access Token** | Access Token used for authenticating/authorizing the InfluxDB request sent by NiFi. |

### PutInfluxDatabase
Expand All @@ -297,7 +297,7 @@ Processor to write the content of a FlowFile in 'line protocol'. Please check de
| --- | --- |
| **Database Name** | InfluxDB database to connect to |
| **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB |
| Username | Username for accessing InfluxDB |
| Password | Password for user |
| **Character Set** | Specifies the character set of the document data |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public static InfluxDB makeConnectionV1(String influxDbUrl,
OkHttpClient.Builder builder = new OkHttpClient
.Builder()
.connectTimeout(connectionTimeout, TimeUnit.SECONDS)
.readTimeout(connectionTimeout, TimeUnit.SECONDS)
.writeTimeout(connectionTimeout, TimeUnit.SECONDS)
// add interceptor with "User-Agent" header
.addInterceptor(chain -> {
Request request = chain
Expand Down Expand Up @@ -323,7 +325,11 @@ public static InfluxDBClient makeConnectionV2(String influxDbUrl,
long connectionTimeout,
Consumer<OkHttpClient.Builder> configurer,
final String clientType) {
OkHttpClient.Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS);
OkHttpClient.Builder builder = new OkHttpClient
.Builder()
.connectTimeout(connectionTimeout, TimeUnit.SECONDS)
.readTimeout(connectionTimeout, TimeUnit.SECONDS)
.writeTimeout(connectionTimeout, TimeUnit.SECONDS);
if (configurer != null) {
configurer.accept(builder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.influxdata.nifi.util;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.query.FluxTable;
import org.junit.jupiter.api.Test;

import java.util.List;

public class ITTestTimeout {
static final String INFLUX_DB_2_URL = "http://localhost:9999";
static final String INFLUX_DB_2_ORG = "my-org";
static final String INFLUX_DB_2_TOKEN = "my-token";

@Test
public void testReadTimeout() {
InfluxDBClient influxDBClient = InfluxDBUtils.makeConnectionV2(INFLUX_DB_2_URL, INFLUX_DB_2_TOKEN, 60, null, null);
List<FluxTable> unused = influxDBClient.getQueryApi().query("import \"array\"\n" +
"import \"experimental/json\"\n" +
"import \"http/requests\"\n" +
"response = requests.get(url: \"http://httpbin:8080/delay/20\")\n" +
"data = json.parse(data: response.body)\n" +
"array.from(rows: [{_field: \"origin\", _value: data.origin, _time: now()}])", INFLUX_DB_2_ORG);
}
}
21 changes: 21 additions & 0 deletions scripts/influxdb-restart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ DEFAULT_INFLUXDB_V2_VERSION="latest"
INFLUXDB_V2_VERSION="${INFLUXDB_V2_VERSION:-$DEFAULT_INFLUXDB_V2_VERSION}"
INFLUXDB_V2_IMAGE=influxdb:${INFLUXDB_V2_VERSION}

HTTPBIN_IMAGE=mccutchen/go-httpbin

SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )"

docker kill influxdb || true
Expand All @@ -42,8 +44,26 @@ docker rm influxdb-secured || true
docker kill influxdb_v2 || true
docker rm influxdb_v2 || true

docker kill httpbin || true
docker rm httpbin || true

docker pull ${INFLUXDB_IMAGE} || true
docker pull ${INFLUXDB_V2_IMAGE} || true
docker pull ${HTTPBIN_IMAGE} || true

#
# Testing helper service
#
echo
echo "Starting httpbin service ..."
echo

docker run \
--detach \
--name httpbin \
--publish 8080:8080 \
--env=MAX_DURATION=60s \
${HTTPBIN_IMAGE}

echo
echo "Starting unsecured InfluxDB..."
Expand Down Expand Up @@ -87,6 +107,7 @@ docker run \
--env INFLUXD_HTTP_BIND_ADDRESS=:9999 \
--name influxdb_v2 \
--link=influxdb \
--link httpbin \
--publish 9999:9999 \
${INFLUXDB_V2_IMAGE}

Expand Down