diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCustomPortIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCustomPortIT.java new file mode 100644 index 000000000000..9e5f4a15cf22 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCustomPortIT.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.autocreate; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2AutoCreateSchema.class}) +public class IoTDBPipeConnectorCustomPortIT extends AbstractPipeDualAutoIT { + + @Override + @Before + public void setUp() { + // Override to enable air-gap + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setPipeAirGapReceiverEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + + // 10 min, assert that the operations will not time out + senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + @Test + public void testAsyncPortRange() { + doTest( + "iotdb-thrift-async-connector", receiverEnv.getIP() + ":" + receiverEnv.getPort(), "range"); + } + + @Test + public void testAsyncPortCandidate() { + doTest( + "iotdb-thrift-async-connector", + receiverEnv.getIP() + ":" + receiverEnv.getPort(), + "candidate"); + } + + @Test + public void testSyncThriftPortRange() { + doTest( + "iotdb-thrift-sync-connector", receiverEnv.getIP() + ":" + receiverEnv.getPort(), "range"); + } + + @Test + public void testSyncThriftPortCandidate() { + doTest( + "iotdb-thrift-sync-connector", + receiverEnv.getIP() + ":" + receiverEnv.getPort(), + "candidate"); + } + + @Test + public void testAirGapPortRange() { + doTest( + "iotdb-air-gap-connector", + receiverEnv.getIP() + ":" + receiverEnv.getDataNodeWrapper(0).getPipeAirGapReceiverPort(), + "range"); + } + + @Test + public void testAirGapPortCandidate() { + doTest( + "iotdb-air-gap-connector", + receiverEnv.getIP() + ":" + receiverEnv.getDataNodeWrapper(0).getPipeAirGapReceiverPort(), + "candidate"); + } + + private void doTest(final String connector, final String urls, final String strategy) { + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + extractorAttributes.put("realtime.mode", "forced-log"); + connectorAttributes.put("connector", connector); + connectorAttributes.put("batch.enable", "false"); + connectorAttributes.put("node-urls", urls); + connectorAttributes.put("send-port.restriction-strategy", strategy); + if (strategy.equals("range")) { + connectorAttributes.put("send-port.range.min", "1024"); + connectorAttributes.put("send-port.range.max", "1524"); + } else { + StringBuilder candidateBuilder = new StringBuilder(); + for (int i = 0; i < 30; i++) { + candidateBuilder.append(1024 + i).append(","); + } + candidateBuilder.append(1024 + 30); + connectorAttributes.put("send-port.candidate", candidateBuilder.toString()); + } + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Thread.sleep(2000); + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)", + "insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)", + "flush")); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.s1),", + Collections.singleton("2,")); + + } catch (Exception e) { + Assert.fail(); + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java index b2967e3bdf86..ba5c0c911b0b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java @@ -40,7 +40,11 @@ public IoTDBConfigNodeSyncClientManager( String trustStorePwd, String loadBalanceStrategy, boolean shouldReceiverConvertOnTypeMismatch, - String loadTsFileStrategy) { + String loadTsFileStrategy, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts) { super( endPoints, useSSL, @@ -49,7 +53,11 @@ public IoTDBConfigNodeSyncClientManager( false, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 307151df706b..2e4c3dd3c737 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -65,7 +65,11 @@ protected IoTDBSyncClientManager constructClient( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts) { return new IoTDBConfigNodeSyncClientManager( nodeUrls, useSSL, @@ -73,7 +77,11 @@ protected IoTDBSyncClientManager constructClient( trustStorePwd, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 549923882d5a..99ba8658ee7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.client.async.AsyncTEndPoint; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.client.IoTDBClientManager; @@ -59,16 +60,18 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeAsyncClientManager.class); - private final Set endPointSet; + private final Set endPointSet; + private final List tEndPointList; private static final Map RECEIVER_ATTRIBUTES_REF_COUNT = new ConcurrentHashMap<>(); private final String receiverAttributes; - // receiverAttributes -> IClientManager - private static final Map> + // receiverAttributes -> IClientManager + private static final Map< + String, IClientManager> ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new ConcurrentHashMap<>(); - private final IClientManager endPoint2Client; + private final IClientManager endPoint2Client; private final LoadBalancer loadBalancer; @@ -77,13 +80,13 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private final String loadTsFileStrategy; public IoTDBDataNodeAsyncClientManager( - List endPoints, + List endPoints, boolean useLeaderCache, String loadBalanceStrategy, boolean shouldReceiverConvertOnTypeMismatch, String loadTsFileStrategy) { - super(endPoints, useLeaderCache); - + super(null, useLeaderCache); + this.tEndPointList = endPoints; endPointSet = new HashSet<>(endPoints); receiverAttributes = @@ -92,7 +95,7 @@ public IoTDBDataNodeAsyncClientManager( if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) { ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( receiverAttributes, - new IClientManager.Factory() + new IClientManager.Factory() .createClientManager( new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory())); } @@ -132,10 +135,10 @@ public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) throws E return borrowClient(); } - return borrowClient(LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId)); + return borrowClient((AsyncTEndPoint) LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId)); } - public AsyncPipeDataTransferServiceClient borrowClient(TEndPoint endPoint) throws Exception { + public AsyncPipeDataTransferServiceClient borrowClient(AsyncTEndPoint endPoint) throws Exception { if (!useLeaderCache || Objects.isNull(endPoint)) { return borrowClient(); } @@ -282,13 +285,13 @@ private void waitHandshakeFinished(AtomicBoolean isHandshakeFinished) { } } - public void updateLeaderCache(String deviceId, TEndPoint endPoint) { + public void updateLeaderCache(String deviceId, AsyncTEndPoint endPoint) { if (!useLeaderCache || deviceId == null || endPoint == null) { return; } if (!endPointSet.contains(endPoint)) { - endPointList.add(endPoint); + tEndPointList.add(endPoint); endPointSet.add(endPoint); } @@ -301,8 +304,9 @@ public void close() { receiverAttributes, (attributes, refCount) -> { if (refCount <= 1) { - final IClientManager clientManager = - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes); + final IClientManager + clientManager = + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes); if (clientManager != null) { try { clientManager.close(); @@ -326,9 +330,10 @@ private interface LoadBalancer { private class RoundRobinLoadBalancer implements LoadBalancer { @Override public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { - final int clientSize = endPointList.size(); + final int clientSize = tEndPointList.size(); while (true) { - final TEndPoint targetNodeUrl = endPointList.get((int) (currentClientIndex++ % clientSize)); + final AsyncTEndPoint targetNodeUrl = + tEndPointList.get((int) (currentClientIndex++ % clientSize)); final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); if (handshakeIfNecessary(targetNodeUrl, client)) { @@ -341,9 +346,9 @@ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { private class RandomLoadBalancer implements LoadBalancer { @Override public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { - final int clientSize = endPointList.size(); + final int clientSize = tEndPointList.size(); while (true) { - final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() * clientSize)); + final AsyncTEndPoint targetNodeUrl = tEndPointList.get((int) (Math.random() * clientSize)); final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); if (handshakeIfNecessary(targetNodeUrl, client)) { @@ -357,7 +362,7 @@ private class PriorityLoadBalancer implements LoadBalancer { @Override public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { while (true) { - for (final TEndPoint targetNodeUrl : endPointList) { + for (final AsyncTEndPoint targetNodeUrl : tEndPointList) { final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); if (handshakeIfNecessary(targetNodeUrl, client)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java index 5e4e0fbfcb8e..e46ae18715ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java @@ -50,7 +50,12 @@ public IoTDBDataNodeSyncClientManager( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts) { + super( endPoints, useSSL, @@ -59,7 +64,11 @@ public IoTDBDataNodeSyncClientManager( useLeaderCache, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 5aa4324a2f39..cea1daf1f647 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.client.async.AsyncTEndPoint; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; @@ -70,6 +71,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; @@ -120,7 +122,17 @@ public void customize( clientManager = new IoTDBDataNodeAsyncClientManager( - nodeUrls, + nodeUrls.stream() + .map( + tEndPoint -> + new AsyncTEndPoint( + tEndPoint.getIp(), + tEndPoint.getPort(), + minSendPortRange, + maxSendPortRange, + candidatePorts, + customSendPortStrategy)) + .collect(Collectors.toList()), parameters.getBooleanOrDefault( Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, CONNECTOR_LEADER_CACHE_ENABLE_KEY), CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE), @@ -259,7 +271,7 @@ private void transfer( final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) { AsyncPipeDataTransferServiceClient client = null; try { - client = clientManager.borrowClient(endPoint); + client = clientManager.borrowClient((AsyncTEndPoint) endPoint); pipeTransferTabletBatchEventHandler.transfer(client); } catch (final Exception ex) { logOnClientException(client, ex); @@ -379,7 +391,15 @@ public void transfer(final Event event) throws Exception { //////////////////////////// Leader cache update //////////////////////////// public void updateLeaderCache(final String deviceId, final TEndPoint endPoint) { - clientManager.updateLeaderCache(deviceId, endPoint); + clientManager.updateLeaderCache( + deviceId, + new AsyncTEndPoint( + endPoint.getIp(), + endPoint.getPort(), + minSendPortRange, + maxSendPortRange, + candidatePorts, + customSendPortStrategy)); } //////////////////////////// Exception handlers //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index e6f1ac979577..5434a2100c19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -89,7 +89,11 @@ protected IoTDBSyncClientManager constructClient( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy) { + final String loadTsFileStrategy, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + List candidatePorts) { clientManager = new IoTDBDataNodeSyncClientManager( nodeUrls, @@ -99,7 +103,11 @@ protected IoTDBSyncClientManager constructClient( useLeaderCache, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts); return clientManager; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index c59416d1eaa0..968413d54abf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.client.async.AsyncTEndPoint; import org.apache.iotdb.commons.client.property.ClientPoolProperty; import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty; @@ -269,12 +270,14 @@ public KeyedObjectPool cre } public static class AsyncPipeDataTransferServiceClientPoolFactory - implements IClientPoolFactory { + implements IClientPoolFactory { + + public AsyncPipeDataTransferServiceClientPoolFactory() {} @Override - public KeyedObjectPool createClientPool( - ClientManager manager) { - final GenericKeyedObjectPool clientPool = + public KeyedObjectPool createClientPool( + ClientManager manager) { + final GenericKeyedObjectPool clientPool = new GenericKeyedObjectPool<>( new AsyncPipeDataTransferServiceClient.Factory( manager, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 91fa97fc82c7..28e91efe0f85 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.client.util.IoTDBConnectorPortBinder; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; @@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -50,8 +54,8 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC private final boolean printLogWhenEncounterException; - private final TEndPoint endpoint; - private final ClientManager clientManager; + private final AsyncTEndPoint endpoint; + private final ClientManager clientManager; private final AtomicBoolean shouldReturnSelf = new AtomicBoolean(true); @@ -59,15 +63,28 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC public AsyncPipeDataTransferServiceClient( ThriftClientProperty property, - TEndPoint endpoint, + AsyncTEndPoint endpoint, TAsyncClientManager tClientManager, - ClientManager clientManager) + ClientManager clientManager, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts) throws IOException { super( property.getProtocolFactory(), tClientManager, TNonblockingSocketWrapper.wrap( endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + SocketChannel socketChannel = ((TNonblockingSocket) ___transport).getSocketChannel(); + IoTDBConnectorPortBinder.bindPort( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, + (sendPort) -> { + socketChannel.bind(new InetSocketAddress(sendPort)); + }); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; @@ -175,10 +192,10 @@ public String toString() { } public static class Factory - extends AsyncThriftClientFactory { + extends AsyncThriftClientFactory { public Factory( - ClientManager clientManager, + ClientManager clientManager, ThriftClientProperty thriftClientProperty, String threadName) { super(clientManager, thriftClientProperty, threadName); @@ -186,24 +203,28 @@ public Factory( @Override public void destroyObject( - TEndPoint endPoint, PooledObject pooledObject) { + AsyncTEndPoint endPoint, PooledObject pooledObject) { pooledObject.getObject().close(); } @Override - public PooledObject makeObject(TEndPoint endPoint) + public PooledObject makeObject(AsyncTEndPoint endPoint) throws Exception { return new DefaultPooledObject<>( new AsyncPipeDataTransferServiceClient( thriftClientProperty, endPoint, tManagers[clientCnt.incrementAndGet() % tManagers.length], - clientManager)); + clientManager, + endPoint.getCustomSendPortStrategy(), + endPoint.getMaxSendPortRange(), + endPoint.getMaxSendPortRange(), + endPoint.getCandidatePorts())); } @Override public boolean validateObject( - TEndPoint endPoint, PooledObject pooledObject) { + AsyncTEndPoint endPoint, PooledObject pooledObject) { return pooledObject.getObject().isReady(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncTEndPoint.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncTEndPoint.java new file mode 100644 index 000000000000..8981c6f38fcd --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncTEndPoint.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.async; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; + +import java.util.List; +import java.util.Objects; + +public class AsyncTEndPoint extends TEndPoint { + + private int minSendPortRange; + private int maxSendPortRange; + private List candidatePorts; + private String customSendPortStrategy; + + public AsyncTEndPoint( + String ip, + int port, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts, + String customSendPortStrategy) { + super(ip, port); + this.minSendPortRange = minSendPortRange; + this.maxSendPortRange = maxSendPortRange; + this.candidatePorts = candidatePorts; + this.customSendPortStrategy = customSendPortStrategy; + } + + public int getMinSendPortRange() { + return minSendPortRange; + } + + public void setMinSendPortRange(int minSendPortRange) { + this.minSendPortRange = minSendPortRange; + } + + public int getMaxSendPortRange() { + return maxSendPortRange; + } + + public void setMaxSendPortRange(int maxSendPortRange) { + this.maxSendPortRange = maxSendPortRange; + } + + public List getCandidatePorts() { + return candidatePorts; + } + + public void setCandidatePorts(List candidatePorts) { + this.candidatePorts = candidatePorts; + } + + public String getCustomSendPortStrategy() { + return customSendPortStrategy; + } + + public void setCustomSendPortStrategy(String customSendPortStrategy) { + this.customSendPortStrategy = customSendPortStrategy; + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + minSendPortRange, + maxSendPortRange, + candidatePorts, + customSendPortStrategy); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof AsyncTEndPoint)) return false; + AsyncTEndPoint that = (AsyncTEndPoint) obj; + return minSendPortRange == that.minSendPortRange + && maxSendPortRange == that.maxSendPortRange + && Objects.equals(candidatePorts, that.candidatePorts) + && Objects.equals(customSendPortStrategy, that.customSendPortStrategy) + && super.equals(obj); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/util/IoTDBConnectorPortBinder.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/util/IoTDBConnectorPortBinder.java new file mode 100644 index 000000000000..2761800a0afd --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/util/IoTDBConnectorPortBinder.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.util; + +import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.commons.utils.function.Consumer; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class IoTDBConnectorPortBinder { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConnectorPortBinder.class); + + // ===========================bind================================ + + public static void bindPort( + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts, + final Consumer consumer) { + final boolean isRange = + PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY.equals( + customSendPortStrategy); + boolean portFound = false; + int index = 0; + boolean searching = isRange || !candidatePorts.isEmpty(); + while (searching) { + int port = isRange ? minSendPortRange + index : candidatePorts.get(index); + try { + consumer.accept(port); + portFound = true; + break; + } catch (Exception ignored) { + } + index++; + searching = isRange ? port <= maxSendPortRange : candidatePorts.size() > index; + } + if (!portFound) { + String exceptionMessage = + isRange + ? String.format( + "Failed to find an available send port within the range %d to %d.", + minSendPortRange, maxSendPortRange) + : String.format( + "Failed to find an available send port in the candidate list [%s].", + candidatePorts); + LOGGER.warn(exceptionMessage); + throw new PipeConnectionException(exceptionMessage); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 77eccfb4069d..6966027185ac 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -46,6 +46,29 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls"; + public static final String CONNECTOR_IOTDB_SEND_PORT_MIN_KEY = "connector.send-port.range.min"; + public static final String SINK_IOTDB_SEND_PORT_MIN_KEY = "sink.send-port.range.min"; + public static final String CONNECTOR_IOTDB_SEND_PORT_MAX_KEY = "connector.send-port.range.max"; + public static final String SINK_IOTDB_SEND_PORT_MAX_KEY = "sink.send-port.range.max"; + public static final String CONNECTOR_IOTDB_SEND_PORT_CANDIDATE_KEY = + "connector.send-port.candidate"; + public static final String SINK_IOTDB_SEND_PORT_CANDIDATE_KEY = "sink.send-port.candidate"; + public static final String CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY = + "connector.send-port.restriction-strategy"; + public static final String SINK_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY = + "sink.send-port.restriction-strategy"; + public static final int CONNECTOR_IOTDB_SEND_PORT_MIN_VALUE = 0; + public static final int CONNECTOR_IOTDB_SEND_PORT_MAX_VALUE = 65535; + public static final String CONNECTOR_IOTDB_SEND_PORT_CANDIDATE_VALUE = ""; + public static final String CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY = "range"; + public static final String CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_CANDIDATE_STRATEGY = "candidate"; + public static final Set CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_CANDIDATE_STRATEGY, + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY))); + public static final String SINK_IOTDB_SSL_ENABLE_KEY = "sink.ssl.enable"; public static final String SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY = "sink.ssl.trust-store-path"; public static final String SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY = "sink.ssl.trust-store-pwd"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java index 73e0543fe675..5cb338eb7656 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java @@ -43,11 +43,34 @@ public abstract class IoTDBClientManager { // it is a DataNode receiver. The flag is useless for configNode receiver. protected boolean supportModsIfIsDataNodeReceiver = true; + protected int minSendPortRange; + + protected int maxSendPortRange; + + protected List candidatePorts; + + protected String customSendPortStrategy; + private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours protected static final AtomicInteger CONNECTION_TIMEOUT_MS = new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); + protected IoTDBClientManager( + List endPointList, + boolean useLeaderCache, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts) { + this.endPointList = endPointList; + this.useLeaderCache = useLeaderCache; + this.customSendPortStrategy = customSendPortStrategy; + this.minSendPortRange = minSendPortRange; + this.maxSendPortRange = maxSendPortRange; + this.candidatePorts = candidatePorts; + } + protected IoTDBClientManager(List endPointList, boolean useLeaderCache) { this.endPointList = endPointList; this.useLeaderCache = useLeaderCache; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java index d79c430474c3..c35fef804743 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java @@ -22,23 +22,28 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.client.util.IoTDBConnectorPortBinder; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport; import org.apache.iotdb.rpc.TimeoutChangeableTransport; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.thrift.TException; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class IoTDBSyncClient extends IClientRPCService.Client @@ -52,6 +57,51 @@ public class IoTDBSyncClient extends IClientRPCService.Client private final int port; private final TEndPoint endPoint; + public IoTDBSyncClient( + ThriftClientProperty property, + String ipAddress, + int port, + boolean useSSL, + String trustStore, + String trustStorePwd, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts) + throws TTransportException { + super( + property + .getProtocolFactory() + .getProtocol( + useSSL + ? DeepCopyRpcTransportFactory.INSTANCE.getTransport( + ipAddress, + port, + property.getConnectionTimeoutMs(), + trustStore, + trustStorePwd) + : DeepCopyRpcTransportFactory.INSTANCE.getTransport( + ipAddress, port, property.getConnectionTimeoutMs()))); + this.ipAddress = ipAddress; + this.port = port; + this.endPoint = new TEndPoint(ipAddress, port); + final TTransport transport = getInputProtocol().getTransport(); + if (!transport.isOpen()) { + IoTDBConnectorPortBinder.bindPort( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, + (sendPort) -> { + final InetSocketAddress isa = new InetSocketAddress(sendPort); + ((TSocket) ((TimeoutChangeableTFastFramedTransport) transport).getSocket()) + .getSocket() + .bind(isa); + }); + transport.open(); + } + } + public IoTDBSyncClient( ThriftClientProperty property, String ipAddress, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index 81b5194621ce..9022e20a1951 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -72,8 +72,18 @@ protected IoTDBSyncClientManager( boolean useLeaderCache, String loadBalanceStrategy, boolean shouldReceiverConvertOnTypeMismatch, - String loadTsFileStrategy) { - super(endPoints, useLeaderCache); + String loadTsFileStrategy, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts) { + super( + endPoints, + useLeaderCache, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts); this.useSSL = useSSL; this.trustStorePath = trustStorePath; @@ -159,7 +169,11 @@ private void initClientAndStatus( endPoint.getPort(), useSSL, trustStorePath, - trustStorePwd)); + trustStorePwd, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts)); } catch (Exception e) { throw new PipeConnectionException( String.format( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java index 8972fb424b3a..4c5776030701 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.util.IoTDBConnectorPortBinder; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteResponse; @@ -169,6 +170,12 @@ public void handshake() throws Exception { } final AirGapSocket socket = new AirGapSocket(ip, port); + IoTDBConnectorPortBinder.bindPort( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, + (sendPort) -> socket.bind(new InetSocketAddress(sendPort))); try { socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index db58cec22ca9..19b2b6078b16 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -42,12 +42,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY; @@ -80,6 +82,15 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_CANDIDATE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_CANDIDATE_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_MAX_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_MAX_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_MIN_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_MIN_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET; @@ -103,6 +114,10 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SEND_PORT_CANDIDATE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SEND_PORT_MAX_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SEND_PORT_MIN_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; @@ -113,11 +128,22 @@ public abstract class IoTDBConnector implements PipeConnector { "Exception occurred while parsing node urls from target servers: {}"; private static final String PARSE_URL_ERROR_MESSAGE = "Error occurred while parsing node urls from target servers, please check the specified 'host':'port' or 'node-urls'"; + public static final int MIN_PORT = + 0; // The minimum port number allocated to user processes by the operating system + public static final int MAX_PORT = 65535; // The maximum value for port numbers private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConnector.class); protected final List nodeUrls = new ArrayList<>(); + protected int minSendPortRange; + + protected int maxSendPortRange; + + protected List candidatePorts; + + protected String customSendPortStrategy; + protected String loadBalanceStrategy; protected String loadTsFileStrategy; @@ -175,6 +201,76 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)); + this.customSendPortStrategy = + parameters + .getStringOrDefault( + Arrays.asList( + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY, + SINK_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY), + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY) + .trim() + .toLowerCase(); + + validator.validate( + arg -> CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET.contains(customSendPortStrategy), + String.format( + "send port restriction strategy should be one of %s, but got %s.", + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET, customSendPortStrategy), + customSendPortStrategy); + + if (CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY.equals(customSendPortStrategy)) { + minSendPortRange = + parameters.getIntOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SEND_PORT_MIN_KEY, SINK_IOTDB_SEND_PORT_MIN_KEY), + CONNECTOR_IOTDB_SEND_PORT_MIN_VALUE); + maxSendPortRange = + parameters.getIntOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SEND_PORT_MAX_KEY, SINK_IOTDB_SEND_PORT_MAX_KEY), + CONNECTOR_IOTDB_SEND_PORT_MAX_VALUE); + validator.validate( + args -> (int) args[0] <= (int) args[1], + String.format( + "%s must be less than or equal to %s, but got %d > %d.", + SINK_IOTDB_SEND_PORT_MIN_KEY, + SINK_IOTDB_SEND_PORT_MAX_KEY, + minSendPortRange, + maxSendPortRange), + minSendPortRange, + maxSendPortRange); + validator.validate( + args -> (int) args[0] <= (int) args[1] && (int) args[2] >= (int) args[3], + String.format( + "Port range is invalid: %s must be >= %d and %s must be <= %d. Current values are %d and %d respectively.", + SINK_IOTDB_SEND_PORT_MIN_KEY, + MIN_PORT, + SINK_IOTDB_SEND_PORT_MAX_KEY, + MAX_PORT, + minSendPortRange, + maxSendPortRange), + MIN_PORT, + minSendPortRange, + MAX_PORT, + maxSendPortRange); + } else { + this.candidatePorts = + parseCandidatePorts( + parameters.getStringOrDefault( + Arrays.asList( + CONNECTOR_IOTDB_SEND_PORT_CANDIDATE_KEY, SINK_IOTDB_SEND_PORT_CANDIDATE_KEY), + CONNECTOR_IOTDB_SEND_PORT_CANDIDATE_VALUE)); + validator.validate( + arg -> (int) arg > 0, + "The number of candidate ports must be greater than 0.", + candidatePorts.size()); + validator.validate( + arg -> (int) arg[0] >= MIN_PORT && (int) arg[1] <= MAX_PORT, + String.format( + "Candidate port range is invalid: Ports must be between 0 and 65535, but got minimum port: %d and maximum port: %d", + candidatePorts.get(0), candidatePorts.get(candidatePorts.size() - 1)), + candidatePorts.get(0), + candidatePorts.get(candidatePorts.size() - 1)); + } + loadBalanceStrategy = parameters .getStringOrDefault( @@ -428,6 +524,17 @@ private void checkNodeUrls(final Set nodeUrls) throws PipeParameterNo } } + private static List parseCandidatePorts(String candidate) { + if (candidate == null || candidate.isEmpty()) { + return Collections.emptyList(); + } + return Arrays.stream(candidate.split(",")) + .map(String::trim) + .map(Integer::parseInt) + .sorted() + .collect(Collectors.toList()); + } + @Override public void close() { // TODO: Not all the limiters should be closed here, but it's fine for now. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index 6a4dd9e90679..f41cccf2c217 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -124,7 +124,11 @@ public void customize( useLeaderCache, loadBalanceStrategy, shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts); } protected abstract IoTDBSyncClientManager constructClient( @@ -135,7 +139,11 @@ protected abstract IoTDBSyncClientManager constructClient( final boolean useLeaderCache, final String loadBalanceStrategy, final boolean shouldReceiverConvertOnTypeMismatch, - final String loadTsFileStrategy); + final String loadTsFileStrategy, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts); @Override public void handshake() throws Exception { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/Consumer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/Consumer.java new file mode 100644 index 000000000000..e6c84451d790 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/Consumer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.function; + +public interface Consumer { + void accept(INPUT1 var1) throws THROWABLE; +}