Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
fix: InfluxDB client read/write timeout settings (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
alespour authored Apr 22, 2024
2 parents eb50e63 + f1a8dd0 commit 7a74bc9
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## v1.29.0 [unreleased]

### Bug Fixes
1. [#163](https://github.com/influxdata/nifi-influxdb-bundle/pull/163): Max connection timeout also used as read/write timeout.

## v1.28.0 [2024-03-01]

### Others
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Allows sharing connection configuration to InfluxDB 1.x among more NiFi process
| SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections |
| Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. |
| **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB |
| Username | Username which is used to authorize against the InfluxDB |
| Password | Password for the username which is used to authorize against the InfluxDB. If the authorization fail the FlowFile will be penalized and routed to 'retry' relationship. |

Expand All @@ -284,7 +284,7 @@ Allows sharing connection configuration to InfluxDB 2.0 among more NiFi processo
| SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections |
| Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. |
| **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB |
| **InfluxDB Access Token** | Access Token used for authenticating/authorizing the InfluxDB request sent by NiFi. |

### PutInfluxDatabase
Expand All @@ -297,7 +297,7 @@ Processor to write the content of a FlowFile in 'line protocol'. Please check de
| --- | --- |
| **Database Name** | InfluxDB database to connect to |
| **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB |
| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB |
| Username | Username for accessing InfluxDB |
| Password | Password for user |
| **Character Set** | Specifies the character set of the document data |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public static InfluxDB makeConnectionV1(String influxDbUrl,
OkHttpClient.Builder builder = new OkHttpClient
.Builder()
.connectTimeout(connectionTimeout, TimeUnit.SECONDS)
.readTimeout(connectionTimeout, TimeUnit.SECONDS)
.writeTimeout(connectionTimeout, TimeUnit.SECONDS)
// add interceptor with "User-Agent" header
.addInterceptor(chain -> {
Request request = chain
Expand Down Expand Up @@ -323,7 +325,11 @@ public static InfluxDBClient makeConnectionV2(String influxDbUrl,
long connectionTimeout,
Consumer<OkHttpClient.Builder> configurer,
final String clientType) {
OkHttpClient.Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS);
OkHttpClient.Builder builder = new OkHttpClient
.Builder()
.connectTimeout(connectionTimeout, TimeUnit.SECONDS)
.readTimeout(connectionTimeout, TimeUnit.SECONDS)
.writeTimeout(connectionTimeout, TimeUnit.SECONDS);
if (configurer != null) {
configurer.accept(builder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.influxdata.nifi.util;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.query.FluxTable;
import org.junit.jupiter.api.Test;

import java.util.List;

public class ITTestTimeout {
static final String INFLUX_DB_2_URL = "http://localhost:9999";
static final String INFLUX_DB_2_ORG = "my-org";
static final String INFLUX_DB_2_TOKEN = "my-token";

@Test
public void testReadTimeout() {
InfluxDBClient influxDBClient = InfluxDBUtils.makeConnectionV2(INFLUX_DB_2_URL, INFLUX_DB_2_TOKEN, 60, null, null);
List<FluxTable> unused = influxDBClient.getQueryApi().query("import \"array\"\n" +
"import \"experimental/json\"\n" +
"import \"http/requests\"\n" +
"response = requests.get(url: \"http://httpbin:8080/delay/20\")\n" +
"data = json.parse(data: response.body)\n" +
"array.from(rows: [{_field: \"origin\", _value: data.origin, _time: now()}])", INFLUX_DB_2_ORG);
}
}
21 changes: 21 additions & 0 deletions scripts/influxdb-restart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ DEFAULT_INFLUXDB_V2_VERSION="latest"
INFLUXDB_V2_VERSION="${INFLUXDB_V2_VERSION:-$DEFAULT_INFLUXDB_V2_VERSION}"
INFLUXDB_V2_IMAGE=influxdb:${INFLUXDB_V2_VERSION}

HTTPBIN_IMAGE=mccutchen/go-httpbin

SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )"

docker kill influxdb || true
Expand All @@ -42,8 +44,26 @@ docker rm influxdb-secured || true
docker kill influxdb_v2 || true
docker rm influxdb_v2 || true

docker kill httpbin || true
docker rm httpbin || true

docker pull ${INFLUXDB_IMAGE} || true
docker pull ${INFLUXDB_V2_IMAGE} || true
docker pull ${HTTPBIN_IMAGE} || true

#
# Testing helper service
#
echo
echo "Starting httpbin service ..."
echo

docker run \
--detach \
--name httpbin \
--publish 8080:8080 \
--env=MAX_DURATION=60s \
${HTTPBIN_IMAGE}

echo
echo "Starting unsecured InfluxDB..."
Expand Down Expand Up @@ -87,6 +107,7 @@ docker run \
--env INFLUXD_HTTP_BIND_ADDRESS=:9999 \
--name influxdb_v2 \
--link=influxdb \
--link httpbin \
--publish 9999:9999 \
${INFLUXDB_V2_IMAGE}

Expand Down

0 comments on commit 7a74bc9

Please sign in to comment.