From 434558b34e0e6b62786a0e3c46ce71c11f6456a3 Mon Sep 17 00:00:00 2001 From: Ahmed Hamdy Date: Thu, 2 May 2024 15:19:03 +0100 Subject: [PATCH] [FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to 1.19 --- .github/workflows/push_pr.yml | 7 +- .github/workflows/weekly.yml | 4 - .../a6cee285-bdbf-4479-a652-8143c2bc1a69 | 9 + .../archunit-violations/stored.rules | 3 +- flink-connector-rabbitmq/pom.xml | 15 + .../connector/rabbitmq/common/Constants.java | 17 + .../DefaultRabbitMQMessageConverter.java | 23 + .../common/RabbitMQConnectionConfig.java | 465 ++++++++++ .../RabbitMQConnectionSetupException.java | 13 + .../rabbitmq/common/RabbitMQMessage.java | 144 +++ .../common/RabbitMQMessageConverter.java | 18 + .../common/SerializableReturnListener.java | 29 + .../common/util/RabbitMQConnectionUtil.java | 65 ++ .../connector/rabbitmq/sink/RabbitMQSink.java | 87 ++ .../rabbitmq/sink/RabbitMQSinkBuilder.java | 113 +++ .../rabbitmq/sink/RabbitMQSinkWriter.java | 258 ++++++ .../connectors/rabbitmq/RMQSink.java | 1 + .../rabbitmq/RMQSinkPublishOptions.java | 5 +- .../rabbitmq/SerializableReturnListener.java | 8 +- .../rabbitmq/common/RMQConnectionConfig.java | 590 +------------ .../DefaultRabbitMQMessageConverterTest.java | 27 + .../common/RabbitMQConnectionConfigTest.java | 147 ++++ .../sink/RabbitMQSinkBuilderTest.java | 92 ++ .../rabbitmq/sink/RabbitMQSinkWriterTest.java | 467 ++++++++++ .../rabbitmq/sink/util/TestChannel.java | 832 ++++++++++++++++++ .../rabbitmq/sink/util/TestConnection.java | 222 +++++ .../sink/util/TestMessageConverter.java | 56 ++ .../common/RMQConnectionConfigTest.java | 36 +- pom.xml | 2 +- 29 files changed, 3156 insertions(+), 599 deletions(-) create mode 100644 flink-connector-rabbitmq/archunit-violations/a6cee285-bdbf-4479-a652-8143c2bc1a69 create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/Constants.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverter.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionSetupException.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessage.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessageConverter.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/SerializableReturnListener.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/util/RabbitMQConnectionUtil.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilder.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverterTest.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfigTest.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilderTest.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriterTest.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestChannel.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestConnection.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestMessageConverter.java diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index da8a5776289..c8419358294 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,11 +28,8 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.18-SNAPSHOT ] - jdk: [ '8, 11, 17' ] - include: - - flink: 1.19-SNAPSHOT - jdk: '8, 11, 17, 21' + flink: [ 1.19-SNAPSHOT ] + jdk: [ '8, 11, 17, 21' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 8385763e67a..49db4a14292 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -30,10 +30,6 @@ jobs: strategy: matrix: flink_branches: [{ - flink: 1.18-SNAPSHOT, - jdk: '8, 11, 17', - branch: main - }, { flink: 1.19-SNAPSHOT, jdk: '8, 11, 17, 21', branch: main diff --git a/flink-connector-rabbitmq/archunit-violations/a6cee285-bdbf-4479-a652-8143c2bc1a69 b/flink-connector-rabbitmq/archunit-violations/a6cee285-bdbf-4479-a652-8143c2bc1a69 new file mode 100644 index 00000000000..bd828109de7 --- /dev/null +++ b/flink-connector-rabbitmq/archunit-violations/a6cee285-bdbf-4479-a652-8143c2bc1a69 @@ -0,0 +1,9 @@ +Method calls method in (RMQSource.java:415) +Method calls method in (RMQSource.java:303) +Method calls method in (RMQSource.java:300) +Method calls method in (RMQSource.java:275) +Method calls method in (RMQSource.java:254) +Method calls method in (RMQSource.java:266) +Method checks instanceof in (RMQSource.java:253) +Method is annotated with in (RMQSource.java:0) +Method is annotated with in (RMQSource.java:0) diff --git a/flink-connector-rabbitmq/archunit-violations/stored.rules b/flink-connector-rabbitmq/archunit-violations/stored.rules index 14d465e7980..3896c539bd3 100644 --- a/flink-connector-rabbitmq/archunit-violations/stored.rules +++ b/flink-connector-rabbitmq/archunit-violations/stored.rules @@ -1,5 +1,5 @@ # -#Mon Apr 03 14:18:47 CEST 2023 +#Thu May 02 14:30:30 BST 2024 Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ annotated\ with\ @Public.=f67f70fc-4a24-448c-a247-354e7ce69167 Connector\ production\ code\ must\ not\ depend\ on\ non-public\ API\ outside\ of\ connector\ packages=deb59a69-6a64-49f2-8aa3-84985ee63d70 ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=6fdbfe74-a937-4a8a-8e1b-9f0a3391f3fe @@ -8,3 +8,4 @@ Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ packa Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=675cade4-c44e-4b2b-aacf-0c23d2032e4a Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ must\ be\ annotated\ with\ @Public(Evolving).=871721c9-4c5f-4523-b8f6-a419e8a0085f Classes\ in\ API\ packages\ should\ have\ at\ least\ one\ API\ visibility\ annotation.=54a3d1fc-24ac-4bdc-bf15-56e8d7831aed +Connector\ production\ code\ must\ depend\ only\ on\ public\ API\ when\ outside\ of\ connector\ packages=a6cee285-bdbf-4479-a652-8143c2bc1a69 diff --git a/flink-connector-rabbitmq/pom.xml b/flink-connector-rabbitmq/pom.xml index 22d0c3acbef..e3f9df33251 100644 --- a/flink-connector-rabbitmq/pom.xml +++ b/flink-connector-rabbitmq/pom.xml @@ -76,6 +76,21 @@ under the License. test + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-connector-base + ${flink.version} + test + test-jar + + org.testcontainers rabbitmq diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/Constants.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/Constants.java new file mode 100644 index 00000000000..59320a554ba --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/Constants.java @@ -0,0 +1,17 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; + +/** Constants for the RabbitMQ connector. */ +@PublicEvolving +public class Constants { + + /** The default RabbitMQ host Exchange used when exchange routing is disabled. */ + public static final String DEFAULT_EXCHANGE = ""; + + /** The default maximum number of inflight messages handled by SinkWriter at the same time. */ + public static final int DEFAULT_MAX_INFLIGHT = 100; + + /** The default behaviour of sink on failing to send elements. */ + public static final boolean DEFAULT_FAIL_ON_ERROR = false; +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverter.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverter.java new file mode 100644 index 00000000000..0ae6c15de41 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverter.java @@ -0,0 +1,23 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_EXCHANGE; + +/** + * Default implementation of {@link RabbitMQMessageConverter}. + * + * @param type of the message to be converted + */ +@PublicEvolving +public class DefaultRabbitMQMessageConverter implements RabbitMQMessageConverter { + @Override + public RabbitMQMessage toRabbitMQMessage(T value) { + return RabbitMQMessage.builder().setMessage(value).setExchange(DEFAULT_EXCHANGE).build(); + } + + @Override + public boolean supportsExchangeRouting() { + return false; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java new file mode 100644 index 00000000000..c9a19998750 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java @@ -0,0 +1,465 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** Configuration class for RabbitMQ connections. */ +@PublicEvolving +public class RabbitMQConnectionConfig implements Serializable { + private static final long DEFAULT_DELIVERY_TIMEOUT = 30000; + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + private Integer prefetchCount; + private final long deliveryTimeout; + + protected RabbitMQConnectionConfig(RabbitMQConnectionConfig.Builder builder) { + Preconditions.checkArgument( + builder.uri != null || (builder.host != null && builder.port != null), + "Either URI or host/port must be set"); + if (builder.uri == null) { + Preconditions.checkNotNull( + builder.username, "username can not be null when host/port is set"); + Preconditions.checkNotNull( + builder.password, "password can not be null when host/port is set"); + } + + Preconditions.checkArgument( + builder.deliveryTimeout == null || builder.deliveryTimeout > 0, + "deliveryTimeout must be positive"); + this.uri = builder.uri; + this.host = builder.host; + this.port = builder.port; + this.virtualHost = builder.virtualHost; + this.username = builder.username; + this.password = builder.password; + + this.networkRecoveryInterval = builder.networkRecoveryInterval; + this.automaticRecovery = builder.automaticRecovery; + this.topologyRecovery = builder.topologyRecovery; + this.connectionTimeout = builder.connectionTimeout; + this.requestedChannelMax = builder.requestedChannelMax; + this.requestedFrameMax = builder.requestedFrameMax; + this.requestedHeartbeat = builder.requestedHeartbeat; + this.prefetchCount = builder.prefetchCount; + this.deliveryTimeout = + Optional.ofNullable(builder.deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT); + } + + /** + * @param uri the connection URI + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @param deliveryTimeout message delivery timeout in the queueing consumer + * @throws NullPointerException if URI is null + */ + protected RabbitMQConnectionConfig( + String uri, + Integer networkRecoveryInterval, + Boolean automaticRecovery, + Boolean topologyRecovery, + Integer connectionTimeout, + Integer requestedChannelMax, + Integer requestedFrameMax, + Integer requestedHeartbeat, + Integer prefetchCount, + Long deliveryTimeout) { + Preconditions.checkNotNull(uri, "Uri can not be null"); + Preconditions.checkArgument( + deliveryTimeout == null || deliveryTimeout > 0, "deliveryTimeout must be positive"); + this.uri = uri; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + this.prefetchCount = prefetchCount; + this.deliveryTimeout = + Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT); + } + + /** @return the host to use for connections */ + public String getHost() { + return host; + } + + /** @return the port to use for connections */ + public Integer getPort() { + return port; + } + + /** + * Retrieve the virtual host. + * + * @return the virtual host to use when connecting to the broker + */ + public String getVirtualHost() { + return virtualHost; + } + + /** + * Retrieve the user name. + * + * @return the AMQP user name to use when connecting to the broker + */ + public String getUsername() { + return username; + } + + /** + * Retrieve the password. + * + * @return the password to use when connecting to the broker + */ + public String getPassword() { + return password; + } + + /** + * Retrieve the URI. + * + * @return the connection URI when connecting to the broker + */ + public String getUri() { + return uri; + } + + /** + * Returns automatic connection recovery interval in milliseconds. + * + * @return how long will automatic recovery wait before attempting to reconnect, in ms; default + * is 5000 + */ + public Integer getNetworkRecoveryInterval() { + return networkRecoveryInterval; + } + + /** + * Returns true if automatic connection recovery is enabled, false otherwise. + * + * @return true if automatic connection recovery is enabled, false otherwise + */ + public Boolean isAutomaticRecovery() { + return automaticRecovery; + } + + /** + * Returns true if topology recovery is enabled, false otherwise. + * + * @return true if topology recovery is enabled, false otherwise + */ + public Boolean isTopologyRecovery() { + return topologyRecovery; + } + + /** + * Retrieve the connection timeout. + * + * @return the connection timeout, in milliseconds; zero for infinite + */ + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Retrieve the requested maximum channel number. + * + * @return the initially requested maximum channel number; zero for unlimited + */ + public Integer getRequestedChannelMax() { + return requestedChannelMax; + } + + /** + * Retrieve the requested maximum frame size. + * + * @return the initially requested maximum frame size, in octets; zero for unlimited + */ + public Integer getRequestedFrameMax() { + return requestedFrameMax; + } + + /** + * Retrieve the requested heartbeat interval. + * + * @return the initially requested heartbeat interval, in seconds; zero for none + */ + public Integer getRequestedHeartbeat() { + return requestedHeartbeat; + } + + /** + * Retrieve the channel prefetch count. + * + * @return an Optional of the prefetch count, if set, for the consumer channel + */ + public Optional getPrefetchCount() { + return Optional.ofNullable(prefetchCount); + } + + /** + * Retrieve the message delivery timeout used in the queueing consumer. If not specified + * explicitly, the default value of 30000 milliseconds will be returned. + * + * @return the message delivery timeout, in milliseconds + */ + public long getDeliveryTimeout() { + return deliveryTimeout; + } + + public static Builder> builder() { + return new Builder<>(); + } + + /** The Builder Class for {@link RMQConnectionConfig}. */ + public static class Builder> { + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + // basicQos options for consumers + private Integer prefetchCount; + + private Long deliveryTimeout; + + private String uri; + + /** + * Set the target port. + * + * @param port the default port to use for connections + * @return the Builder + */ + public T setPort(int port) { + this.port = port; + return (T) this; + } + + /** + * @param host the default host to use for connections + * @return the Builder + */ + public T setHost(String host) { + this.host = host; + return (T) this; + } + + /** + * Set the virtual host. + * + * @param virtualHost the virtual host to use when connecting to the broker + * @return the Builder + */ + public T setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + return (T) this; + } + + /** + * Set the user name. + * + * @param username the AMQP user name to use when connecting to the broker + * @return the Builder + */ + public T setUserName(String username) { + this.username = username; + return (T) this; + } + + /** + * Set the password. + * + * @param password the password to use when connecting to the broker + * @return the Builder + */ + public T setPassword(String password) { + this.password = password; + return (T) this; + } + + /** + * Convenience method for setting the fields in an AMQP URI: host, port, username, password + * and virtual host. If any part of the URI is omitted, the {@link + * com.rabbitmq.client.ConnectionFactory}'s corresponding variable is left unchanged. + * + * @param uri is the AMQP URI containing the data + * @return the Builder + */ + public T setUri(String uri) { + this.uri = uri; + return (T) this; + } + + /** + * Enables or disables topology recovery. + * + * @param topologyRecovery if true, enables topology recovery + * @return the Builder + */ + public T setTopologyRecoveryEnabled(boolean topologyRecovery) { + this.topologyRecovery = topologyRecovery; + return (T) this; + } + + /** + * Set the requested heartbeat. + * + * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero + * for none + * @return the Builder + */ + public T setRequestedHeartbeat(int requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + return (T) this; + } + + /** + * Set the requested maximum frame size. + * + * @param requestedFrameMax initially requested maximum frame size, in octets; zero for + * unlimited + * @return the Builder + */ + public T setRequestedFrameMax(int requestedFrameMax) { + this.requestedFrameMax = requestedFrameMax; + return (T) this; + } + + /** + * Set the requested maximum channel number. + * + * @param requestedChannelMax initially requested maximum channel number; zero for unlimited + */ + public T setRequestedChannelMax(int requestedChannelMax) { + this.requestedChannelMax = requestedChannelMax; + return (T) this; + } + + /** + * Sets connection recovery interval. Default is 5000. + * + * @param networkRecoveryInterval how long will automatic recovery wait before attempting to + * reconnect, in ms + * @return the Builder + */ + public T setNetworkRecoveryInterval(int networkRecoveryInterval) { + this.networkRecoveryInterval = networkRecoveryInterval; + return (T) this; + } + + /** + * Set the connection timeout. + * + * @param connectionTimeout connection establishment timeout in milliseconds; zero for + * infinite + * @return the Builder + */ + public T setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return (T) this; + } + + /** + * Enables or disables automatic connection recovery. + * + * @param automaticRecovery if true, enables connection recovery + * @return the Builder + */ + public T setAutomaticRecovery(boolean automaticRecovery) { + this.automaticRecovery = automaticRecovery; + return (T) this; + } + + /** + * Enables setting basicQos for the consumer channel. Only applicable to the {@link + * RMQSource}. Set to 0 for unlimited, which is the default. + * + * @see Consumer Prefetch + * @see Channel + * Prefetch (QoS) + * @param prefetchCount the max number of messages to receive without acknowledgement. + * @return the Builder + */ + public T setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + return (T) this; + } + + /** + * Enables setting the message delivery timeout in the queueing consumer. Only applicable to + * the {@link RMQSource}. If not set it will default to 30000. + * + * @param deliveryTimeout maximum wait time, in milliseconds, for the next message delivery + * @return the Builder + */ + public T setDeliveryTimeout(long deliveryTimeout) { + Preconditions.checkArgument(deliveryTimeout > 0, "deliveryTimeout must be positive"); + this.deliveryTimeout = deliveryTimeout; + return (T) this; + } + + /** + * Enables setting the message delivery timeout in the queueing consumer. Only applicable to + * the {@link RMQSource}. If not set it will default to 30 seconds. + * + * @param deliveryTimeout maximum wait time for the next message delivery + * @param unit deliveryTimeout unit + * @return the Builder + */ + public T setDeliveryTimeout(long deliveryTimeout, TimeUnit unit) { + return setDeliveryTimeout(unit.toMillis(deliveryTimeout)); + } + + /** + * The Builder method. + * + *

If URI is NULL we use host, port, vHost, username, password combination to initialize + * connection. using {@link RabbitMQConnectionConfig#RabbitMQConnectionConfig(Builder)}. + * + * @return RMQConnectionConfig + */ + public RabbitMQConnectionConfig build() { + return new RabbitMQConnectionConfig(this); + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionSetupException.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionSetupException.java new file mode 100644 index 00000000000..8bcdb42fa3e --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionSetupException.java @@ -0,0 +1,13 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; + +/** Exception for RabbitMQ connection setup errors. */ +@PublicEvolving +public class RabbitMQConnectionSetupException extends IOException { + public RabbitMQConnectionSetupException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessage.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessage.java new file mode 100644 index 00000000000..8d67c0b6482 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessage.java @@ -0,0 +1,144 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** + * A message to be sent to RabbitMQ with publish options. + * + * @param type of the message to be sent + */ +@PublicEvolving +public class RabbitMQMessage implements Serializable { + private static final long serialVersionUID = 1L; + + private final T message; + + private final boolean isMandatory; + + private final boolean isImmediate; + + @Nullable private final String routingKey; + + @Nullable private final String exchange; + + private final BasicProperties messageProperties; + + private Long serializedSize; + + private RabbitMQMessage(Builder builder) { + this( + builder.message, + builder.routingKey, + builder.exchange, + builder.messageProperties, + builder.isMandatory, + builder.isImmediate); + } + + private RabbitMQMessage( + T message, + @Nullable String routingKey, + @Nullable String exchange, + BasicProperties messageProperties, + boolean isMandatory, + boolean isImmediate) { + this.message = message; + this.routingKey = routingKey; + this.exchange = exchange; + this.messageProperties = messageProperties; + this.isMandatory = isMandatory; + this.isImmediate = isImmediate; + } + + public T getMessage() { + return message; + } + + @Nullable + public String getRoutingKey() { + return routingKey; + } + + @Nullable + public String getExchange() { + return exchange; + } + + public BasicProperties getMessageProperties() { + return messageProperties; + } + + public boolean isMandatory() { + return isMandatory; + } + + public boolean isImmediate() { + return isImmediate; + } + + public Long getSerializedSize() { + return serializedSize; + } + + public void setSerializedSize(Long serializedSize) { + this.serializedSize = serializedSize; + } + + public static Builder builder() { + return new Builder<>(); + } + + /** + * Builder for {@link RabbitMQMessage}. + * + * @param type of the message to be sent + */ + public static class Builder { + private T message; + private boolean isMandatory = false; + private boolean isImmediate = false; + private String routingKey; + private String exchange; + private BasicProperties messageProperties; + + public Builder setMessage(T message) { + this.message = message; + return this; + } + + public Builder setMandatory(boolean mandatory) { + isMandatory = mandatory; + return this; + } + + public Builder setImmediate(boolean immediate) { + isImmediate = immediate; + return this; + } + + public Builder setRoutingKey(String routingKey) { + this.routingKey = routingKey; + return this; + } + + public Builder setExchange(String exchange) { + this.exchange = exchange; + return this; + } + + public Builder setMessageProperties(BasicProperties messageProperties) { + this.messageProperties = messageProperties; + return this; + } + + public RabbitMQMessage build() { + return new RabbitMQMessage<>(this); + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessageConverter.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessageConverter.java new file mode 100644 index 00000000000..07c17187987 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQMessageConverter.java @@ -0,0 +1,18 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * Interface for converting messages to {@link RabbitMQMessage}. + * + * @param type of the message to be converted + */ +@PublicEvolving +public interface RabbitMQMessageConverter extends Serializable { + + RabbitMQMessage toRabbitMQMessage(T value); + + boolean supportsExchangeRouting(); +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/SerializableReturnListener.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/SerializableReturnListener.java new file mode 100644 index 00000000000..bc7d0a62d4c --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/SerializableReturnListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.ReturnListener; + +import java.io.Serializable; + +/** A serializable {@link ReturnListener}. */ +@PublicEvolving +public interface SerializableReturnListener extends Serializable, ReturnListener {} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/util/RabbitMQConnectionUtil.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/util/RabbitMQConnectionUtil.java new file mode 100644 index 00000000000..a78796f21f3 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/util/RabbitMQConnectionUtil.java @@ -0,0 +1,65 @@ +package org.apache.flink.connector.rabbitmq.common.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionSetupException; + +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** + * Utility class for creating a {@link ConnectionFactory} from a {@link RabbitMQConnectionConfig}. + */ +@Internal +public class RabbitMQConnectionUtil { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConnectionUtil.class); + + public static ConnectionFactory getConnectionFactory(RabbitMQConnectionConfig connectionConfig) + throws RabbitMQConnectionSetupException { + ConnectionFactory factory = new ConnectionFactory(); + if (connectionConfig.getUri() != null && !connectionConfig.getUri().isEmpty()) { + try { + factory.setUri(connectionConfig.getUri()); + } catch (URISyntaxException | KeyManagementException | NoSuchAlgorithmException e) { + LOG.error("Failed to parse uri", e); + throw new RabbitMQConnectionSetupException( + "Failed to create connection factory", e); + } + } else { + factory.setHost(connectionConfig.getHost()); + factory.setPort(connectionConfig.getPort()); + factory.setVirtualHost(connectionConfig.getVirtualHost()); + factory.setUsername(connectionConfig.getUsername()); + factory.setPassword(connectionConfig.getPassword()); + } + + if (connectionConfig.isAutomaticRecovery() != null) { + factory.setAutomaticRecoveryEnabled(connectionConfig.isAutomaticRecovery()); + } + if (connectionConfig.getConnectionTimeout() != null) { + factory.setConnectionTimeout(connectionConfig.getConnectionTimeout()); + } + if (connectionConfig.getNetworkRecoveryInterval() != null) { + factory.setNetworkRecoveryInterval(connectionConfig.getNetworkRecoveryInterval()); + } + if (connectionConfig.getRequestedHeartbeat() != null) { + factory.setRequestedHeartbeat(connectionConfig.getRequestedHeartbeat()); + } + if (connectionConfig.isTopologyRecovery() != null) { + factory.setTopologyRecoveryEnabled(connectionConfig.isTopologyRecovery()); + } + if (connectionConfig.getRequestedChannelMax() != null) { + factory.setRequestedChannelMax(connectionConfig.getRequestedChannelMax()); + } + if (connectionConfig.getRequestedFrameMax() != null) { + factory.setRequestedFrameMax(connectionConfig.getRequestedFrameMax()); + } + + return factory; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java new file mode 100644 index 00000000000..bbbf379dc1c --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java @@ -0,0 +1,87 @@ +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter; +import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A RabbitMQ {@link Sink} to produce data into RabbitMQ. The sink uses the {@link + * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link + * RabbitMQMessageConverter} to convert the input data to {@link + * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link + * com.rabbitmq.client.Channel} to send messages to the specified queue. + * + * @param input type for the sink. + */ +@PublicEvolving +public class RabbitMQSink implements Sink { + + private final RabbitMQConnectionConfig connectionConfig; + + private final SerializationSchema serializationSchema; + + private final RabbitMQMessageConverter messageConverter; + + private final SerializableReturnListener returnListener; + + private final String queueName; + + private final int maximumInflightMessages; + + private final boolean failOnError; + + public RabbitMQSink( + RabbitMQConnectionConfig connectionConfig, + SerializationSchema serializationSchema, + RabbitMQMessageConverter messageConverter, + SerializableReturnListener returnListener, + String queueName, + int maximumInflightMessages, + boolean failOnError) { + Preconditions.checkNotNull(queueName, "queueName cannot be null"); + Preconditions.checkNotNull(messageConverter, "messageConverter cannot be null"); + Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null"); + Preconditions.checkNotNull(connectionConfig, "connectionConfig cannot be null"); + Preconditions.checkArgument( + maximumInflightMessages > 0, "maximumInflightMessages must be greater than 0"); + + this.connectionConfig = connectionConfig; + this.serializationSchema = serializationSchema; + this.messageConverter = messageConverter; + this.returnListener = returnListener; + this.queueName = queueName; + this.maximumInflightMessages = maximumInflightMessages; + this.failOnError = failOnError; + } + + @Override + public SinkWriter createWriter(InitContext initContext) throws IOException { + throw new UnsupportedOperationException( + "Creating writer with Deprecated InitContext is not supported. Please use WriterInitContext."); + } + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return new RabbitMQSinkWriter<>( + context, + connectionConfig, + queueName, + messageConverter, + serializationSchema, + returnListener, + maximumInflightMessages, + failOnError); + } + + public static RabbitMQSinkBuilder builder() { + return new RabbitMQSinkBuilder(); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilder.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilder.java new file mode 100644 index 00000000000..3ff5aab6807 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilder.java @@ -0,0 +1,113 @@ +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.rabbitmq.common.DefaultRabbitMQMessageConverter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter; +import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener; + +import java.util.Optional; + +import static org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_MAX_INFLIGHT; + +/** + * A builder for creating a {@link RabbitMQSink}. + * + *

The builder uses the following parameters to build a {@link RabbitMQSink}: + * + *

    + *
  • {@link RabbitMQConnectionConfig} for the connection to RabbitMQ. + *
  • {@link SerializationSchema} for serializing the input data. + *
  • {@link RabbitMQMessageConverter} for converting the input data to {@link + * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}. + *
  • {@link SerializableReturnListener} for handling returned messages. + *
  • {@code queueName} for the name of the queue to send messages to. + *
  • {@code maximumInflightMessages} for the maximum number of in-flight messages. + *
  • {@code failOnError} for whether to fail on an error. + *
+ * + *

It can be used as follows: + * + *

{@code
+ * RabbitMQSink rabbitMQSink = {@code RabbitMQSinkBuilder}.builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setMessageConverter(new SimpleStringMessageConverter())
+ *     .setReturnListener(new SimpleReturnListener())
+ *     .setQueueName("queue")
+ *     .setMaximumInflightMessages(10)
+ *     .setFailOnError(true)
+ *     .build();
+ *
+ * }
+ * + * @param + */ +@PublicEvolving +public class RabbitMQSinkBuilder { + + private RabbitMQConnectionConfig connectionConfig; + + private SerializationSchema serializationSchema; + + private RabbitMQMessageConverter messageConverter; + + private SerializableReturnListener returnListener; + + private String queueName; + + private Integer maximumInflightMessages; + + private Boolean failOnError; + + public RabbitMQSinkBuilder setConnectionConfig(RabbitMQConnectionConfig connectionConfig) { + this.connectionConfig = connectionConfig; + return this; + } + + public RabbitMQSinkBuilder setSerializationSchema( + SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + public RabbitMQSinkBuilder setMessageConverter( + RabbitMQMessageConverter messageConverter) { + this.messageConverter = messageConverter; + return this; + } + + public RabbitMQSinkBuilder setReturnListener(SerializableReturnListener returnListener) { + this.returnListener = returnListener; + return this; + } + + public RabbitMQSinkBuilder setQueueName(String queueName) { + this.queueName = queueName; + return this; + } + + public RabbitMQSinkBuilder setMaximumInflightMessages(int maximumInflightMessages) { + this.maximumInflightMessages = maximumInflightMessages; + return this; + } + + public RabbitMQSinkBuilder setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + return this; + } + + public RabbitMQSink build() { + return new RabbitMQSink<>( + connectionConfig, + serializationSchema, + Optional.ofNullable(messageConverter) + .orElse(new DefaultRabbitMQMessageConverter<>()), + returnListener, + queueName, + Optional.ofNullable(maximumInflightMessages).orElse(DEFAULT_MAX_INFLIGHT), + Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR)); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java new file mode 100644 index 00000000000..9fc5dd97b0f --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriter.java @@ -0,0 +1,258 @@ +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessage; +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter; +import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConfirmCallback; +import com.rabbitmq.client.Connection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.TreeMap; +import java.util.concurrent.TimeoutException; + +/** + * A {@link SinkWriter} to produce data into RabbitMQ. The sink uses the {@link + * RabbitMQConnectionConfig} to create a connection to RabbitMQ, uses {@link + * RabbitMQMessageConverter} to convert the input data to {@link + * org.apache.flink.connector.rabbitmq.common.RabbitMQMessage}, and uses {@link + * com.rabbitmq.client.Channel} to send messages to the specified queue. + * + *

The sink writer is stateless and blocks for new writes if the number of inflight messages + * exceeds the maximum number of inflight messages' parameter. The sink writer also blocks for + * inflight messages before taking snapshots. + * + * @param input type for the sink. + */ +@Internal +public class RabbitMQSinkWriter implements SinkWriter { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriter.class); + + /** The name of the queue to send messages to. */ + private final String queueName; + + /** + * The message converter to convert the input data to {@link RabbitMQMessage}. + * + *

The MessageConverter also is responsible of defining the routing schema of message + * publishing, by implementing the {@link RabbitMQMessageConverter#supportsExchangeRouting()} + * method to signal if the converter supports exchange routing. + */ + private final RabbitMQMessageConverter messageConverter; + + private final SerializationSchema serializationSchema; + + private final SerializableReturnListener returnListener; + + /* Counter for number of bytes this sink has attempted to send to the destination. */ + private final Counter numBytesOutCounter; + + /* Counter for number of records this sink has attempted to send to the destination. */ + private final Counter numRecordsOutCounter; + + private final MailboxExecutor mailboxExecutor; + + /** + * The maximum number of inflight messages. The sink writer blocks for new writes if the number + * of inflight messages exceeds this value. + */ + private final int maximumInflightMessages; + + /* Flag to indicate if the sink should fail on error. */ + private final boolean failOnError; + + /** + * Map to hold inflightMessages, the {@code getSuccessConfirmCallback} and {@code + * getFailureConfirmCallback} are triggered using sequence numbers hence we keep the mapping of + * sequence number to the message. We are using a sorted map to evict all inflight requests with + * sequence number less than or equal to the sequence number of the message that was + * acknowledged if flagged to acknowledge all previous messages. + */ + private final TreeMap> inflightMessages; + + private Connection connection; + private Channel channel; + + public RabbitMQSinkWriter( + WriterInitContext context, + RabbitMQConnectionConfig connectionConfig, + String queueName, + RabbitMQMessageConverter messageConverter, + SerializationSchema serializationSchema, + SerializableReturnListener returnListener, + int maximumInflightMessages, + boolean failOnError) + throws IOException { + this( + context, + queueName, + messageConverter, + serializationSchema, + returnListener, + maximumInflightMessages, + failOnError); + Preconditions.checkNotNull(connectionConfig, "connectionConfig cannot be null"); + try { + Connection connection = + RabbitMQConnectionUtil.getConnectionFactory(connectionConfig).newConnection(); + initializeConnection(connection); + } catch (TimeoutException e) { + throw new IOException("Failed to create connection", e); + } + } + + @VisibleForTesting + RabbitMQSinkWriter( + WriterInitContext context, + String queueName, + RabbitMQMessageConverter messageConverter, + SerializationSchema serializationSchema, + SerializableReturnListener returnListener, + int maximumInflightMessages, + boolean failOnError) { + Preconditions.checkNotNull(context, "context cannot be null"); + + this.mailboxExecutor = context.getMailboxExecutor(); + this.maximumInflightMessages = maximumInflightMessages; + this.failOnError = failOnError; + + SinkWriterMetricGroup metricGroup = context.metricGroup(); + this.queueName = queueName; + this.messageConverter = messageConverter; + this.serializationSchema = serializationSchema; + this.returnListener = returnListener; + + this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + this.inflightMessages = new TreeMap<>(); + } + + @VisibleForTesting + void initializeConnection(Connection connection) throws IOException { + this.connection = connection; + this.channel = connection.createChannel(); + channel.addReturnListener(returnListener); + channel.addConfirmListener(getSuccessConfirmCallback(), getFailureConfirmCallback()); + channel.confirmSelect(); + channel.queueDeclare(queueName, true, false, false, null); + } + + @Override + public void write(T t, Context context) throws IOException, InterruptedException { + awaitInflightMessagesBelow(maximumInflightMessages); + RabbitMQMessage recordMessage = messageConverter.toRabbitMQMessage(t); + publishMessage(recordMessage); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + awaitInflightMessagesBelow(1); + } + + @Override + public void close() throws Exception { + awaitInflightMessagesBelow(1); + channel.close(); + connection.close(); + } + + private void awaitInflightMessagesBelow(int maximumInflightMessages) + throws InterruptedException { + while (inflightMessages.size() >= maximumInflightMessages) { + mailboxExecutor.yield(); + } + } + + private void publishMessage(RabbitMQMessage recordMessage) throws IOException { + byte[] message = serializationSchema.serialize(recordMessage.getMessage()); + recordMessage.setSerializedSize((long) message.length); + Long sequenceNumber = channel.getNextPublishSeqNo(); + + if (messageConverter.supportsExchangeRouting()) { + Preconditions.checkArgument( + returnListener != null + || !(recordMessage.isImmediate() || recordMessage.isMandatory()), + "Return listener must be set if immediate or mandatory delivery is requested"); + + channel.basicPublish( + recordMessage.getExchange(), + recordMessage.getRoutingKey(), + recordMessage.isMandatory(), + recordMessage.isImmediate(), + recordMessage.getMessageProperties(), + message); + + } else { + channel.basicPublish( + recordMessage.getExchange(), + queueName, + recordMessage.getMessageProperties(), + message); + } + + inflightMessages.put(sequenceNumber, recordMessage); + } + + private ConfirmCallback getSuccessConfirmCallback() { + return (seqNo, acknowledgePrevious) -> + mailboxExecutor.execute( + () -> { + if (acknowledgePrevious) { + LOG.debug( + "Acknowledge all messages with sequence number less than or equal to {}", + seqNo); + while (!inflightMessages.isEmpty() + && inflightMessages.firstKey() <= (Long) seqNo) { + RabbitMQMessage message = + inflightMessages.remove(inflightMessages.firstKey()); + numBytesOutCounter.inc(message.getSerializedSize()); + numRecordsOutCounter.inc(); + } + } else { + LOG.debug("Acknowledge message with sequence number {}", seqNo); + RabbitMQMessage message = inflightMessages.remove(seqNo); + numBytesOutCounter.inc(message.getSerializedSize()); + numRecordsOutCounter.inc(); + } + }, + "Acknowledge message with sequence number " + seqNo); + } + + private ConfirmCallback getFailureConfirmCallback() { + return (seqNo, acknowledgePrevious) -> + mailboxExecutor.execute( + () -> { + if (failOnError) { + LOG.error( + "Failed to send message with sequence number {} and payload {}", + seqNo, + inflightMessages.get(seqNo).getMessage()); + throw new FlinkRuntimeException( + String.format( + "Failed to send message with sequence number %d and payload %s", + seqNo, inflightMessages.get(seqNo).getMessage())); + } + LOG.warn( + "Resending failed message with sequence number {} and payload {}", + seqNo, + inflightMessages.get(seqNo).getMessage()); + publishMessage(inflightMessages.remove(seqNo)); + }, + "Handle failure for message with sequence number " + seqNo); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index 6ebeb1f43c1..cebdab8512b 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -41,6 +41,7 @@ * * @param */ +@Deprecated public class RMQSink extends RichSinkFunction { private static final long serialVersionUID = 1L; diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java index 57c5d103804..cd67e1ab15d 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Public; import com.rabbitmq.client.AMQP.BasicProperties; @@ -27,7 +27,8 @@ * * @param The type of the data used by the sink. */ -@PublicEvolving +@Public +@Deprecated public interface RMQSinkPublishOptions extends java.io.Serializable { /** diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java index 416fa16c96f..cd212a34cbd 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java @@ -18,9 +18,11 @@ package org.apache.flink.streaming.connectors.rabbitmq; -import com.rabbitmq.client.ReturnListener; +import org.apache.flink.annotation.PublicEvolving; -import java.io.Serializable; +import com.rabbitmq.client.ReturnListener; /** A serializable {@link ReturnListener}. */ -public interface SerializableReturnListener extends Serializable, ReturnListener {} +@PublicEvolving +public interface SerializableReturnListener + extends org.apache.flink.connector.rabbitmq.common.SerializableReturnListener {} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java index c93adaa8eb3..ea9a57acf93 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java @@ -17,592 +17,50 @@ package org.apache.flink.streaming.connectors.rabbitmq.common; -import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; -import org.apache.flink.util.Preconditions; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionSetupException; +import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil; import com.rabbitmq.client.ConnectionFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; -import java.util.Optional; -import java.util.concurrent.TimeUnit; /** - * Connection Configuration for RMQ. If {@link Builder#setUri(String)} has been set then {@link - * RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, - * Integer, Integer, Integer, Long)} will be used for initialize the RMQ connection or {@link - * RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, - * Boolean, Boolean, Integer, Integer, Integer, Integer, Integer, Long)} will be used for initialize - * the RMQ connection + * The configuration class extending {@link RabbitMQConnectionConfig} to offer backward + * compatibility to {@link org.apache.flink.streaming.connectors.rabbitmq.RMQSink} and {@link + * org.apache.flink.streaming.connectors.rabbitmq.RMQSource} users. */ -public class RMQConnectionConfig implements Serializable { +public class RMQConnectionConfig extends RabbitMQConnectionConfig { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); - - private static final long DEFAULT_DELIVERY_TIMEOUT = 30000; - - private String host; - private Integer port; - private String virtualHost; - private String username; - private String password; - private String uri; - - private Integer networkRecoveryInterval; - private Boolean automaticRecovery; - private Boolean topologyRecovery; - - private Integer connectionTimeout; - private Integer requestedChannelMax; - private Integer requestedFrameMax; - private Integer requestedHeartbeat; - - private Integer prefetchCount; - private final long deliveryTimeout; - - /** - * @param host host name - * @param port port - * @param virtualHost virtual host - * @param username username - * @param password password - * @param networkRecoveryInterval connection recovery interval in milliseconds - * @param automaticRecovery if automatic connection recovery - * @param topologyRecovery if topology recovery - * @param connectionTimeout connection timeout - * @param requestedChannelMax requested maximum channel number - * @param requestedFrameMax requested maximum frame size - * @param requestedHeartbeat requested heartbeat interval - * @param deliveryTimeout message delivery timeout in the queueing consumer - * @throws NullPointerException if host or virtual host or username or password is null - */ - private RMQConnectionConfig( - String host, - Integer port, - String virtualHost, - String username, - String password, - Integer networkRecoveryInterval, - Boolean automaticRecovery, - Boolean topologyRecovery, - Integer connectionTimeout, - Integer requestedChannelMax, - Integer requestedFrameMax, - Integer requestedHeartbeat, - Integer prefetchCount, - Long deliveryTimeout) { - Preconditions.checkNotNull(host, "host can not be null"); - Preconditions.checkNotNull(port, "port can not be null"); - Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); - Preconditions.checkNotNull(username, "username can not be null"); - Preconditions.checkNotNull(password, "password can not be null"); - Preconditions.checkArgument( - deliveryTimeout == null || deliveryTimeout > 0, "deliveryTimeout must be positive"); - this.host = host; - this.port = port; - this.virtualHost = virtualHost; - this.username = username; - this.password = password; - - this.networkRecoveryInterval = networkRecoveryInterval; - this.automaticRecovery = automaticRecovery; - this.topologyRecovery = topologyRecovery; - this.connectionTimeout = connectionTimeout; - this.requestedChannelMax = requestedChannelMax; - this.requestedFrameMax = requestedFrameMax; - this.requestedHeartbeat = requestedHeartbeat; - this.prefetchCount = prefetchCount; - this.deliveryTimeout = - Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT); - } - - /** - * @param uri the connection URI - * @param networkRecoveryInterval connection recovery interval in milliseconds - * @param automaticRecovery if automatic connection recovery - * @param topologyRecovery if topology recovery - * @param connectionTimeout connection timeout - * @param requestedChannelMax requested maximum channel number - * @param requestedFrameMax requested maximum frame size - * @param requestedHeartbeat requested heartbeat interval - * @param deliveryTimeout message delivery timeout in the queueing consumer - * @throws NullPointerException if URI is null - */ - private RMQConnectionConfig( - String uri, - Integer networkRecoveryInterval, - Boolean automaticRecovery, - Boolean topologyRecovery, - Integer connectionTimeout, - Integer requestedChannelMax, - Integer requestedFrameMax, - Integer requestedHeartbeat, - Integer prefetchCount, - Long deliveryTimeout) { - Preconditions.checkNotNull(uri, "Uri can not be null"); - Preconditions.checkArgument( - deliveryTimeout == null || deliveryTimeout > 0, "deliveryTimeout must be positive"); - this.uri = uri; - - this.networkRecoveryInterval = networkRecoveryInterval; - this.automaticRecovery = automaticRecovery; - this.topologyRecovery = topologyRecovery; - this.connectionTimeout = connectionTimeout; - this.requestedChannelMax = requestedChannelMax; - this.requestedFrameMax = requestedFrameMax; - this.requestedHeartbeat = requestedHeartbeat; - this.prefetchCount = prefetchCount; - this.deliveryTimeout = - Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT); - } - - /** @return the host to use for connections */ - public String getHost() { - return host; - } - - /** @return the port to use for connections */ - public int getPort() { - return port; - } - - /** - * Retrieve the virtual host. - * - * @return the virtual host to use when connecting to the broker - */ - public String getVirtualHost() { - return virtualHost; - } - - /** - * Retrieve the user name. - * - * @return the AMQP user name to use when connecting to the broker - */ - public String getUsername() { - return username; - } - - /** - * Retrieve the password. - * - * @return the password to use when connecting to the broker - */ - public String getPassword() { - return password; + private RMQConnectionConfig(Builder builder) { + super(builder); } - /** - * Retrieve the URI. - * - * @return the connection URI when connecting to the broker - */ - public String getUri() { - return uri; - } - - /** - * Returns automatic connection recovery interval in milliseconds. - * - * @return how long will automatic recovery wait before attempting to reconnect, in ms; default - * is 5000 - */ - public Integer getNetworkRecoveryInterval() { - return networkRecoveryInterval; - } - - /** - * Returns true if automatic connection recovery is enabled, false otherwise. - * - * @return true if automatic connection recovery is enabled, false otherwise - */ - public Boolean isAutomaticRecovery() { - return automaticRecovery; - } - - /** - * Returns true if topology recovery is enabled, false otherwise. - * - * @return true if topology recovery is enabled, false otherwise - */ - public Boolean isTopologyRecovery() { - return topologyRecovery; - } - - /** - * Retrieve the connection timeout. - * - * @return the connection timeout, in milliseconds; zero for infinite - */ - public Integer getConnectionTimeout() { - return connectionTimeout; - } - - /** - * Retrieve the requested maximum channel number. - * - * @return the initially requested maximum channel number; zero for unlimited - */ - public Integer getRequestedChannelMax() { - return requestedChannelMax; - } - - /** - * Retrieve the requested maximum frame size. - * - * @return the initially requested maximum frame size, in octets; zero for unlimited - */ - public Integer getRequestedFrameMax() { - return requestedFrameMax; - } - - /** - * Retrieve the requested heartbeat interval. - * - * @return the initially requested heartbeat interval, in seconds; zero for none - */ - public Integer getRequestedHeartbeat() { - return requestedHeartbeat; - } - - /** - * Retrieve the channel prefetch count. - * - * @return an Optional of the prefetch count, if set, for the consumer channel - */ - public Optional getPrefetchCount() { - return Optional.ofNullable(prefetchCount); - } - - /** - * Retrieve the message delivery timeout used in the queueing consumer. If not specified - * explicitly, the default value of 30000 milliseconds will be returned. - * - * @return the message delivery timeout, in milliseconds - */ - public long getDeliveryTimeout() { - return deliveryTimeout; - } - - /** - * @return Connection Factory for RMQ - * @throws URISyntaxException if Malformed URI has been passed - * @throws NoSuchAlgorithmException if the ssl factory could not be created - * @throws KeyManagementException if the ssl context could not be initialized - */ public ConnectionFactory getConnectionFactory() - throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { - ConnectionFactory factory = new ConnectionFactory(); - if (this.uri != null && !this.uri.isEmpty()) { - try { - factory.setUri(this.uri); - } catch (URISyntaxException e) { - LOG.error("Failed to parse uri", e); - throw e; - } catch (KeyManagementException e) { - // this should never happen - LOG.error("Failed to initialize ssl context.", e); - throw e; - } catch (NoSuchAlgorithmException e) { - // this should never happen - LOG.error("Failed to setup ssl factory.", e); + throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, + RabbitMQConnectionSetupException { + try { + return RabbitMQConnectionUtil.getConnectionFactory(this); + } catch (RabbitMQConnectionSetupException e) { + if (e.getCause() instanceof URISyntaxException) { + throw (URISyntaxException) e.getCause(); + } else if (e.getCause() instanceof NoSuchAlgorithmException) { + throw (NoSuchAlgorithmException) e.getCause(); + } else if (e.getCause() instanceof KeyManagementException) { + throw (KeyManagementException) e.getCause(); + } else { throw e; } - } else { - factory.setHost(this.host); - factory.setPort(this.port); - factory.setVirtualHost(this.virtualHost); - factory.setUsername(this.username); - factory.setPassword(this.password); - } - - if (this.automaticRecovery != null) { - factory.setAutomaticRecoveryEnabled(this.automaticRecovery); - } - if (this.connectionTimeout != null) { - factory.setConnectionTimeout(this.connectionTimeout); } - if (this.networkRecoveryInterval != null) { - factory.setNetworkRecoveryInterval(this.networkRecoveryInterval); - } - if (this.requestedHeartbeat != null) { - factory.setRequestedHeartbeat(this.requestedHeartbeat); - } - if (this.topologyRecovery != null) { - factory.setTopologyRecoveryEnabled(this.topologyRecovery); - } - if (this.requestedChannelMax != null) { - factory.setRequestedChannelMax(this.requestedChannelMax); - } - if (this.requestedFrameMax != null) { - factory.setRequestedFrameMax(this.requestedFrameMax); - } - - return factory; } /** The Builder Class for {@link RMQConnectionConfig}. */ - public static class Builder { - - private String host; - private Integer port; - private String virtualHost; - private String username; - private String password; - - private Integer networkRecoveryInterval; - private Boolean automaticRecovery; - private Boolean topologyRecovery; - - private Integer connectionTimeout; - private Integer requestedChannelMax; - private Integer requestedFrameMax; - private Integer requestedHeartbeat; - - // basicQos options for consumers - private Integer prefetchCount; - - private Long deliveryTimeout; - - private String uri; - - /** - * Set the target port. - * - * @param port the default port to use for connections - * @return the Builder - */ - public Builder setPort(int port) { - this.port = port; - return this; - } - - /** - * @param host the default host to use for connections - * @return the Builder - */ - public Builder setHost(String host) { - this.host = host; - return this; - } - - /** - * Set the virtual host. - * - * @param virtualHost the virtual host to use when connecting to the broker - * @return the Builder - */ - public Builder setVirtualHost(String virtualHost) { - this.virtualHost = virtualHost; - return this; - } - - /** - * Set the user name. - * - * @param username the AMQP user name to use when connecting to the broker - * @return the Builder - */ - public Builder setUserName(String username) { - this.username = username; - return this; - } - - /** - * Set the password. - * - * @param password the password to use when connecting to the broker - * @return the Builder - */ - public Builder setPassword(String password) { - this.password = password; - return this; - } - - /** - * Convenience method for setting the fields in an AMQP URI: host, port, username, password - * and virtual host. If any part of the URI is omitted, the ConnectionFactory's - * corresponding variable is left unchanged. - * - * @param uri is the AMQP URI containing the data - * @return the Builder - */ - public Builder setUri(String uri) { - this.uri = uri; - return this; - } - - /** - * Enables or disables topology recovery. - * - * @param topologyRecovery if true, enables topology recovery - * @return the Builder - */ - public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) { - this.topologyRecovery = topologyRecovery; - return this; - } - - /** - * Set the requested heartbeat. - * - * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero - * for none - * @return the Builder - */ - public Builder setRequestedHeartbeat(int requestedHeartbeat) { - this.requestedHeartbeat = requestedHeartbeat; - return this; - } - - /** - * Set the requested maximum frame size. - * - * @param requestedFrameMax initially requested maximum frame size, in octets; zero for - * unlimited - * @return the Builder - */ - public Builder setRequestedFrameMax(int requestedFrameMax) { - this.requestedFrameMax = requestedFrameMax; - return this; - } - - /** - * Set the requested maximum channel number. - * - * @param requestedChannelMax initially requested maximum channel number; zero for unlimited - */ - public Builder setRequestedChannelMax(int requestedChannelMax) { - this.requestedChannelMax = requestedChannelMax; - return this; - } - - /** - * Sets connection recovery interval. Default is 5000. - * - * @param networkRecoveryInterval how long will automatic recovery wait before attempting to - * reconnect, in ms - * @return the Builder - */ - public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) { - this.networkRecoveryInterval = networkRecoveryInterval; - return this; - } - - /** - * Set the connection timeout. - * - * @param connectionTimeout connection establishment timeout in milliseconds; zero for - * infinite - * @return the Builder - */ - public Builder setConnectionTimeout(int connectionTimeout) { - this.connectionTimeout = connectionTimeout; - return this; - } - - /** - * Enables or disables automatic connection recovery. - * - * @param automaticRecovery if true, enables connection recovery - * @return the Builder - */ - public Builder setAutomaticRecovery(boolean automaticRecovery) { - this.automaticRecovery = automaticRecovery; - return this; - } - - /** - * Enables setting basicQos for the consumer channel. Only applicable to the {@link - * RMQSource}. Set to 0 for unlimited, which is the default. - * - * @see Consumer Prefetch - * @see Channel - * Prefetch (QoS) - * @param prefetchCount the max number of messages to receive without acknowledgement. - * @return the Builder - */ - public Builder setPrefetchCount(int prefetchCount) { - this.prefetchCount = prefetchCount; - return this; - } - - /** - * Enables setting the message delivery timeout in the queueing consumer. Only applicable to - * the {@link RMQSource}. If not set it will default to 30000. - * - * @param deliveryTimeout maximum wait time, in milliseconds, for the next message delivery - * @return the Builder - */ - public Builder setDeliveryTimeout(long deliveryTimeout) { - Preconditions.checkArgument(deliveryTimeout > 0, "deliveryTimeout must be positive"); - this.deliveryTimeout = deliveryTimeout; - return this; - } - - /** - * Enables setting the message delivery timeout in the queueing consumer. Only applicable to - * the {@link RMQSource}. If not set it will default to 30 seconds. - * - * @param deliveryTimeout maximum wait time for the next message delivery - * @param unit deliveryTimeout unit - * @return the Builder - */ - public Builder setDeliveryTimeout(long deliveryTimeout, TimeUnit unit) { - return setDeliveryTimeout(unit.toMillis(deliveryTimeout)); - } - - /** - * The Builder method. - * - *

If URI is NULL we use host, port, vHost, username, password combination to initialize - * connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, - * String, String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer, - * Long)}. - * - *

Otherwise the URI will be used to initialize the client connection {@link - * RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, - * Integer, Integer, Integer, Integer, Long)} - * - * @return RMQConnectionConfig - */ + public static class Builder + extends RabbitMQConnectionConfig.Builder { public RMQConnectionConfig build() { - if (this.uri != null) { - return new RMQConnectionConfig( - this.uri, - this.networkRecoveryInterval, - this.automaticRecovery, - this.topologyRecovery, - this.connectionTimeout, - this.requestedChannelMax, - this.requestedFrameMax, - this.requestedHeartbeat, - this.prefetchCount, - this.deliveryTimeout); - } else { - return new RMQConnectionConfig( - this.host, - this.port, - this.virtualHost, - this.username, - this.password, - this.networkRecoveryInterval, - this.automaticRecovery, - this.topologyRecovery, - this.connectionTimeout, - this.requestedChannelMax, - this.requestedFrameMax, - this.requestedHeartbeat, - this.prefetchCount, - this.deliveryTimeout); - } + return new RMQConnectionConfig(this); } } } diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverterTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverterTest.java new file mode 100644 index 00000000000..7868e1a7e66 --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/DefaultRabbitMQMessageConverterTest.java @@ -0,0 +1,27 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Tests for {@link DefaultRabbitMQMessageConverter}. */ +public class DefaultRabbitMQMessageConverterTest { + @Test + void defaultRabbitMQMessageConverterDoesNotSupportExchangeRouting() { + DefaultRabbitMQMessageConverter messageConverter = + new DefaultRabbitMQMessageConverter<>(); + Assertions.assertThat(messageConverter.supportsExchangeRouting()).isFalse(); + } + + @Test + public void convertToRabbitMQMessage() { + DefaultRabbitMQMessageConverter messageConverter = + new DefaultRabbitMQMessageConverter<>(); + RabbitMQMessage rabbitMQMessage = messageConverter.toRabbitMQMessage(1); + Assertions.assertThat(rabbitMQMessage.getMessage()).isEqualTo(1); + Assertions.assertThat(rabbitMQMessage.getRoutingKey()).isNull(); + Assertions.assertThat(rabbitMQMessage.getExchange()).isEqualTo(Constants.DEFAULT_EXCHANGE); + Assertions.assertThat(rabbitMQMessage.getMessageProperties()).isNull(); + Assertions.assertThat(rabbitMQMessage.isMandatory()).isFalse(); + Assertions.assertThat(rabbitMQMessage.isImmediate()).isFalse(); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfigTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfigTest.java new file mode 100644 index 00000000000..9419d2ff40b --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfigTest.java @@ -0,0 +1,147 @@ +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.connector.rabbitmq.common.util.RabbitMQConnectionUtil; + +import com.rabbitmq.client.ConnectionFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; + +/** Tests for the {@link RabbitMQConnectionConfig}. */ +public class RabbitMQConnectionConfigTest { + @Test + void shouldThrowNullPointExceptionIfHostIsNull() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + RabbitMQConnectionConfig.builder() + .setPort(1000) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .build(), + "Either URI or host/port must be set"); + } + + @Test + void shouldThrowNullPointExceptionIfPortIsNull() throws RabbitMQConnectionSetupException { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .build(), + "Either URI or host/port must be set"); + } + + @Test + void shouldSetDefaultValueIfConnectionTimeoutNotGiven() + throws RabbitMQConnectionSetupException { + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setPort(5672) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .build(); + Assertions.assertEquals( + ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, + RabbitMQConnectionUtil.getConnectionFactory(connectionConfig) + .getConnectionTimeout()); + } + + @Test + void shouldSetProvidedValueIfConnectionTimeoutGiven() throws RabbitMQConnectionSetupException { + int connectionTimeout = 1000; + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setUserName("guest") + .setPort(5672) + .setPassword("guest") + .setVirtualHost("/") + .setConnectionTimeout(connectionTimeout) + .build(); + Assertions.assertEquals( + connectionTimeout, + RabbitMQConnectionUtil.getConnectionFactory(connectionConfig) + .getConnectionTimeout()); + } + + @Test + void shouldSetOptionalPrefetchCount() { + int prefetchCount = 100; + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setPort(5000) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .setPrefetchCount(prefetchCount) + .build(); + Assertions.assertTrue(connectionConfig.getPrefetchCount().isPresent()); + Assertions.assertEquals(prefetchCount, connectionConfig.getPrefetchCount().get()); + } + + @Test + void shouldNotSetOptional() { + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setPort(5000) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .build(); + Assertions.assertFalse(connectionConfig.getPrefetchCount().isPresent()); + } + + @Test + void shouldSetDeliveryTimeout() { + int deliveryTimeout = 1000; + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setPort(5000) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .setDeliveryTimeout(deliveryTimeout) + .build(); + Assertions.assertEquals(deliveryTimeout, connectionConfig.getDeliveryTimeout()); + } + + @Test + void shouldSetDeliveryTimeoutWithTimeUnit() { + int deliveryTimeout = 1000; + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setPort(5000) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .setDeliveryTimeout(deliveryTimeout, TimeUnit.MILLISECONDS) + .build(); + Assertions.assertEquals(deliveryTimeout, connectionConfig.getDeliveryTimeout()); + } + + @Test + void shouldReturnDefaultDeliveryTimeout() { + RabbitMQConnectionConfig connectionConfig = + RabbitMQConnectionConfig.builder() + .setHost("localhost") + .setPort(5000) + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .build(); + Assertions.assertEquals(30000, connectionConfig.getDeliveryTimeout()); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilderTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilderTest.java new file mode 100644 index 00000000000..589a779f4d0 --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkBuilderTest.java @@ -0,0 +1,92 @@ +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.rabbitmq.common.Constants; +import org.apache.flink.connector.rabbitmq.common.DefaultRabbitMQMessageConverter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Tests for the {@link RabbitMQSink}. */ +public class RabbitMQSinkBuilderTest { + + @Test + void buildSinkThrowsExceptionOnNullRabbitMQConnectionConfig() { + RabbitMQMessageConverter messageConverter = new DefaultRabbitMQMessageConverter<>(); + String queueName = "test-queue"; + + RabbitMQSinkBuilder sinkBuilder = new RabbitMQSinkBuilder<>(); + sinkBuilder + .setSerializationSchema(new SimpleStringSchema()) + .setMessageConverter(messageConverter) + .setQueueName(queueName); + + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(sinkBuilder::build) + .withMessageContaining("connectionConfig cannot be null"); + } + + @Test + void buildSinkThrowsExceptionOnNullQueueName() { + RabbitMQMessageConverter messageConverter = new DefaultRabbitMQMessageConverter<>(); + + RabbitMQSinkBuilder sinkBuilder = new RabbitMQSinkBuilder<>(); + sinkBuilder + .setSerializationSchema(new SimpleStringSchema()) + .setMessageConverter(messageConverter) + .setConnectionConfig(getRabbitMQConnectionConfig()); + + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(sinkBuilder::build) + .withMessageContaining("queueName cannot be null"); + } + + @Test + void buildSinkThrowsExceptionOnNullSerializationSchema() { + RabbitMQMessageConverter messageConverter = new DefaultRabbitMQMessageConverter<>(); + + RabbitMQSinkBuilder sinkBuilder = new RabbitMQSinkBuilder<>(); + sinkBuilder + .setQueueName("test-queue") + .setMessageConverter(messageConverter) + .setConnectionConfig(getRabbitMQConnectionConfig()); + + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(sinkBuilder::build) + .withMessageContaining("serializationSchema cannot be null"); + } + + @Test + void buildSinkUsesDefaultValues() { + RabbitMQMessageConverter messageConverter = new DefaultRabbitMQMessageConverter<>(); + String queueName = "test-queue"; + RabbitMQSinkBuilder sinkBuilder = new RabbitMQSinkBuilder<>(); + + RabbitMQSink sink = + sinkBuilder + .setMessageConverter(messageConverter) + .setQueueName(queueName) + .setSerializationSchema(new SimpleStringSchema()) + .setConnectionConfig(getRabbitMQConnectionConfig()) + .build(); + + Assertions.assertThat(sink) + .hasFieldOrPropertyWithValue( + "maximumInflightMessages", Constants.DEFAULT_MAX_INFLIGHT); + + Assertions.assertThat(sink) + .hasFieldOrPropertyWithValue("failOnError", Constants.DEFAULT_FAIL_ON_ERROR); + } + + private RabbitMQConnectionConfig getRabbitMQConnectionConfig() { + return new RabbitMQConnectionConfig.Builder<>() + .setHost("test-host") + .setPort(5672) + .setVirtualHost("/") + .setUserName("guest") + .setPassword("guest") + .build(); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriterTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriterTest.java new file mode 100644 index 00000000000..9f75acc1137 --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkWriterTest.java @@ -0,0 +1,467 @@ +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox; +import org.apache.flink.connector.rabbitmq.common.DefaultRabbitMQMessageConverter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.sink.util.TestChannel; +import org.apache.flink.connector.rabbitmq.sink.util.TestConnection; +import org.apache.flink.connector.rabbitmq.sink.util.TestMessageConverter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; + +import com.rabbitmq.client.AMQP; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +/** Test class for the {@link RabbitMQSinkWriter}. */ +public class RabbitMQSinkWriterTest { + private static final String QUEUE_NAME = "test-queue"; + + private static final Integer MAXIMUM_INFLIGHT_MESSAGES = 3; + + @Test + void testWriterInitializationDeclaresQueue() throws IOException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = getDefaultWriter(null); + writer.initializeConnection(connection); + + Assertions.assertThat(channel.isQueueDeclared()).isTrue(); + } + + @Test + void writerInitializationDeclaresExchange() throws IOException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = getDefaultWriter(null); + writer.initializeConnection(connection); + + Assertions.assertThat(channel.isOpen()).isTrue(); + Assertions.assertThat(connection.isOpen()).isTrue(); + } + + @Test + void writerInitializationAddsReturnListener() throws IOException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + SerializableReturnListener returnListener = (i, s, s1, s2, basicProperties, bytes) -> {}; + + RabbitMQSinkWriter writer = getDefaultWriter(returnListener); + writer.initializeConnection(connection); + + Assertions.assertThat(channel.getReturnListener()).isSameAs(returnListener); + } + + @Test + void writerInitializationAddsConfirmListeners() throws IOException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = getDefaultWriter(null); + writer.initializeConnection(connection); + + Assertions.assertThat(channel.getOnSuccess()).isNotNull(); + Assertions.assertThat(channel.getOnFail()).isNotNull(); + } + + @Test + void writeMessageDeliversMessageWithChannel() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = getDefaultWriter(null); + writer.initializeConnection(connection); + String message = "test-message"; + writeMessageToWriter(writer, message, channel); + writer.flush(false); + + Assertions.assertThat(channel.getDeliveredMessages()).containsExactly(message); + } + + @Test + void writeMessageDeliversMessageCallsReturnListener() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + AtomicReference returnedMessage = new AtomicReference<>(""); + SerializableReturnListener returnListener = + (i, s, s1, s2, basicProperties, bytes) -> { + returnedMessage.set("returned " + new String(bytes)); + }; + + RabbitMQSinkWriter writer = getDefaultWriter(returnListener); + writer.initializeConnection(connection); + String message = "test-message"; + writeMessageToWriter(writer, message, channel); + writer.flush(false); + + Assertions.assertThat(channel.getDeliveredMessages()).containsExactly(message); + Assertions.assertThat(returnedMessage.get()).isEqualTo("returned test-message"); + } + + @Test + void writeMessageIncrementsMetricsOnDelivery() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + WriterInitContext context = new TestSinkInitContext(); + Counter numBytesOutCounter = context.metricGroup().getNumBytesSendCounter(); + Counter numRecordsSendCounter = context.metricGroup().getNumRecordsSendCounter(); + RabbitMQSinkWriter writer = getDefaultWriter(context, null); + writer.initializeConnection(connection); + + String message = "test-message"; + writeMessageToWriter(writer, message, channel, false); + long recordsSentBeforeDelivery = numRecordsSendCounter.getCount(); + long bytesSentBeforeDelivery = numBytesOutCounter.getCount(); + + channel.deliverMessage(message, true); + writer.flush(false); + + Assertions.assertThat(recordsSentBeforeDelivery).isEqualTo(0); + Assertions.assertThat(bytesSentBeforeDelivery).isEqualTo(0); + Assertions.assertThat(numRecordsSendCounter.getCount()).isEqualTo(1); + Assertions.assertThat(numBytesOutCounter.getCount()).isEqualTo(message.length()); + } + + @Test + void writeMessageDoesNotBlockBeforeMaximumInflight() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = + getDefaultWriter(new TestSinkInitContextAnyThreadMailbox(), 2, null); + writer.initializeConnection(connection); + String firstMessage = "first-message"; + String secondMessage = "second-message"; + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch hasStarted1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch hasStarted2 = new CountDownLatch(1); + + writeMessageToWriterAsync(writer, firstMessage, channel, hasStarted1, latch1); + writeMessageToWriterAsync(writer, secondMessage, channel, hasStarted2, latch2); + Boolean firstMessageAttemptSent = + hasStarted1.await(1, java.util.concurrent.TimeUnit.SECONDS); + Boolean secondMessageAttemptSent = + hasStarted2.await(1, java.util.concurrent.TimeUnit.SECONDS); + Boolean firstMessageSent = latch1.await(1, java.util.concurrent.TimeUnit.SECONDS); + Boolean secondMessageSent = latch2.await(1, java.util.concurrent.TimeUnit.SECONDS); + + channel.deliverAllMessages(true); + writer.flush(false); + + Assertions.assertThat(firstMessageAttemptSent).isTrue(); + Assertions.assertThat(secondMessageAttemptSent).isTrue(); + Assertions.assertThat(firstMessageSent).isTrue(); + Assertions.assertThat(secondMessageSent).isTrue(); + Assertions.assertThat(channel.getDeliveredMessages()) + .containsExactlyInAnyOrder(firstMessage, secondMessage); + } + + @Test + void writeMessageBlocksAfterMaximumInflight() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = + getDefaultWriter(new TestSinkInitContextAnyThreadMailbox(), 1, null); + writer.initializeConnection(connection); + String firstMessage = "first-message"; + String secondMessage = "second-message"; + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch hasStarted1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch hasStarted2 = new CountDownLatch(1); + + writeMessageToWriterAsync(writer, firstMessage, channel, hasStarted1, latch1); + Boolean firstMessageAttemptSent = + hasStarted1.await(1, java.util.concurrent.TimeUnit.SECONDS); + Boolean firstMessageSent = latch1.await(1, java.util.concurrent.TimeUnit.SECONDS); + + writeMessageToWriterAsync(writer, secondMessage, channel, hasStarted2, latch2); + Boolean secondMessageAttemptSent = + hasStarted2.await(1, java.util.concurrent.TimeUnit.SECONDS); + Boolean secondMessageBlocked = !latch2.await(1, java.util.concurrent.TimeUnit.SECONDS); + + channel.deliverMessage(firstMessage, true); + + Boolean secondMessageSent = latch2.await(1, java.util.concurrent.TimeUnit.SECONDS); + + channel.deliverMessage(secondMessage, true); + writer.flush(false); + + Assertions.assertThat(firstMessageAttemptSent).isTrue(); + Assertions.assertThat(secondMessageAttemptSent).isTrue(); + Assertions.assertThat(firstMessageSent).isTrue(); + Assertions.assertThat(secondMessageBlocked).isTrue(); + Assertions.assertThat(secondMessageSent).isTrue(); + Assertions.assertThat(channel.getDeliveredMessages()) + .containsExactlyInAnyOrder(firstMessage, secondMessage); + } + + @Test + void writeWithExchangeSupportedConverter() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + TestMessageConverter messageConverter = new TestMessageConverter(); + messageConverter.setExchange("test-exchange"); + SerializableReturnListener returnListener = (i, s, s1, s2, basicProperties, bytes) -> {}; + + RabbitMQSinkWriter writer = + new RabbitMQSinkWriter<>( + new TestSinkInitContext(), + QUEUE_NAME, + messageConverter, + new SimpleStringSchema(), + returnListener, + MAXIMUM_INFLIGHT_MESSAGES, + false); + writer.initializeConnection(connection); + + // write first message + String firstMessage = "test-message-1"; + messageConverter.setRoutingKey("test-routing-key-1"); + messageConverter.setMarkNextMessageAsImmediate(true); + writeMessageToWriter(writer, firstMessage, channel); + + // write second message + String secondMessage = "test-message-2"; + AMQP.BasicProperties secondBasicProperties = + new AMQP.BasicProperties.Builder().appId("appId-2").build(); + messageConverter.setRoutingKey("test-routing-key-2"); + messageConverter.setMarkNextMessageAsImmediate(false); + messageConverter.setMarkNextMessageAsMandatory(true); + messageConverter.setProperties(secondBasicProperties); + writeMessageToWriter(writer, secondMessage, channel); + + // write third message + String thirdMessage = "test-message-3"; + AMQP.BasicProperties thirdBasicProperties = + new AMQP.BasicProperties.Builder().appId("appId-3").build(); + messageConverter.setRoutingKey("test-routing-key-3"); + messageConverter.setMarkNextMessageAsImmediate(true); + messageConverter.setMarkNextMessageAsMandatory(true); + messageConverter.setProperties(thirdBasicProperties); + writeMessageToWriter(writer, thirdMessage, channel); + + writer.flush(false); + + Assertions.assertThat(channel.getPublishedMessageArguments(firstMessage)) + .isEqualTo(Tuple5.of("test-exchange", "test-routing-key-1", false, true, null)); + + Assertions.assertThat(channel.getPublishedMessageArguments(secondMessage)) + .isEqualTo( + Tuple5.of( + "test-exchange", + "test-routing-key-2", + true, + false, + secondBasicProperties)); + + Assertions.assertThat(channel.getPublishedMessageArguments(thirdMessage)) + .isEqualTo( + Tuple5.of( + "test-exchange", + "test-routing-key-3", + true, + true, + thirdBasicProperties)); + + Assertions.assertThat(channel.getDeliveredMessages()) + .containsExactlyInAnyOrder(firstMessage, secondMessage, thirdMessage); + } + + @Test + void writeWithExchangeSupportedConverterFailsIfReturnListenerIsNull() throws IOException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + TestMessageConverter messageConverter = new TestMessageConverter(); + messageConverter.setExchange("test-exchange"); + messageConverter.setMarkNextMessageAsImmediate(true); + RabbitMQSinkWriter writer = + new RabbitMQSinkWriter<>( + new TestSinkInitContext(), + QUEUE_NAME, + messageConverter, + new SimpleStringSchema(), + null, + MAXIMUM_INFLIGHT_MESSAGES, + false); + writer.initializeConnection(connection); + + Assertions.assertThatIllegalArgumentException() + .isThrownBy(() -> writeMessageToWriter(writer, "test-message", channel)) + .withMessageContaining( + "Return listener must be set if immediate or mandatory delivery is requested"); + } + + @Test + void writerRetriesOnFailureIfFailOnErrorIsUnset() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = getDefaultWriter(null); + writer.initializeConnection(connection); + String message = "fail-message"; + writeMessageToWriter(writer, message, channel); + writer.flush(false); + + Assertions.assertThat(channel.getDeliveredMessages()).containsExactly(message); + } + + @Test + void writerFailsOnErrorIfFailOnErrorIsSet() throws IOException, InterruptedException { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = + new RabbitMQSinkWriter<>( + new TestSinkInitContext(), + QUEUE_NAME, + new DefaultRabbitMQMessageConverter<>(), + new SimpleStringSchema(), + null, + MAXIMUM_INFLIGHT_MESSAGES, + true); + + writer.initializeConnection(connection); + String message = "fail-message"; + writeMessageToWriter(writer, message, channel); + + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy(() -> writer.flush(false)) + .withMessage( + "Failed to send message with sequence number 0 and payload fail-message"); + } + + @Test + void closeWriterClosesChannelAndConnection() throws Exception { + TestConnection connection = getRabbitMQTestConnection(); + TestChannel channel = getRabbitMQTestChannel(); + connection.setChannelSupplier(() -> channel); + + RabbitMQSinkWriter writer = getDefaultWriter(null); + writer.initializeConnection(connection); + writer.close(); + + Assertions.assertThat(channel.isOpen()).isFalse(); + Assertions.assertThat(connection.isOpen()).isFalse(); + } + + private void writeMessageToWriter( + RabbitMQSinkWriter writer, String message, TestChannel channel) + throws IOException, InterruptedException { + writeMessageToWriter(writer, message, channel, true); + } + + private void writeMessageToWriter( + RabbitMQSinkWriter writer, String message, TestChannel channel, boolean deliver) + throws IOException, InterruptedException { + channel.prepareMessage(message); + writer.write(message, null); + if (deliver) { + channel.deliverMessage(message); + } + } + + private void writeMessageToWriterAsync( + RabbitMQSinkWriter writer, + String message, + TestChannel channel, + CountDownLatch hasStarted, + CountDownLatch latch) + throws IOException, InterruptedException { + Thread thread = + new Thread( + () -> { + hasStarted.countDown(); + try { + writeMessageToWriter(writer, message, channel, false); + } catch (IOException | InterruptedException ignored) { + } + latch.countDown(); + }); + thread.start(); + } + + private TestConnection getRabbitMQTestConnection() { + return new TestConnection(getRabbitMQConnectionConfig()); + } + + private TestChannel getRabbitMQTestChannel() { + return new TestChannel((byte[] bytes) -> new String(bytes).startsWith("fail")); + } + + private RabbitMQConnectionConfig getRabbitMQConnectionConfig() { + return new RabbitMQConnectionConfig.Builder<>() + .setHost("test-host") + .setPort(5672) + .setVirtualHost("/") + .setUserName("guest") + .setPassword("guest") + .build(); + } + + private RabbitMQSinkWriter getDefaultWriter(SerializableReturnListener returnListener) { + return new RabbitMQSinkWriter<>( + new TestSinkInitContext(), + QUEUE_NAME, + new DefaultRabbitMQMessageConverter<>(), + new SimpleStringSchema(), + returnListener, + MAXIMUM_INFLIGHT_MESSAGES, + false); + } + + private RabbitMQSinkWriter getDefaultWriter( + WriterInitContext context, SerializableReturnListener returnListener) { + return new RabbitMQSinkWriter<>( + context, + QUEUE_NAME, + new DefaultRabbitMQMessageConverter<>(), + new SimpleStringSchema(), + returnListener, + MAXIMUM_INFLIGHT_MESSAGES, + false); + } + + private RabbitMQSinkWriter getDefaultWriter( + WriterInitContext context, + int maximumInflightMessages, + SerializableReturnListener returnListener) { + return new RabbitMQSinkWriter<>( + context, + QUEUE_NAME, + new DefaultRabbitMQMessageConverter<>(), + new SimpleStringSchema(), + returnListener, + maximumInflightMessages, + false); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestChannel.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestChannel.java new file mode 100644 index 00000000000..2c47c5317cb --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestChannel.java @@ -0,0 +1,832 @@ +package org.apache.flink.connector.rabbitmq.sink.util; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.connector.rabbitmq.common.SerializableReturnListener; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.CancelCallback; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.ConfirmCallback; +import com.rabbitmq.client.ConfirmListener; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.ConsumerShutdownSignalCallback; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.Method; +import com.rabbitmq.client.ReturnCallback; +import com.rabbitmq.client.ReturnListener; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.AMQImpl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +/** + * TestChannel is a test implementation of the Channel class that allows for testing the + * RabbitMQSink. + */ +public class TestChannel implements Channel { + + private int seqNo = 0; + private boolean isOpen = true; + + private String queueName; + + private boolean isQueueDeclared = false; + + private boolean isQueueDurable = false; + + private boolean isQueueExclusive = false; + + private boolean isQueueAutoDelete = false; + + private Map queueArguments; + + private ConfirmCallback onSuccess; + private ConfirmCallback onFail; + + private SerializableReturnListener returnListener; + + private final Predicate shouldFail; + private final Set alreadyFailed = new HashSet<>(); + private final Map seqNoMap = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap latchMap = new ConcurrentHashMap<>(); + + private final Map messageThreads = new ConcurrentHashMap<>(); + private final BlockingQueue deliveredMessages = new ArrayBlockingQueue<>(1000); + + private final Map> + publishedMessageArguments = new HashMap<>(); + + public TestChannel(Predicate shouldFail) { + this.shouldFail = shouldFail; + } + + @Override + public void basicPublish( + String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) + throws IOException { + this.basicPublish(exchange, routingKey, false, props, body); + } + + @Override + public void basicPublish( + String exchange, + String routingKey, + boolean mandatory, + AMQP.BasicProperties props, + byte[] body) + throws IOException { + this.basicPublish(exchange, routingKey, mandatory, false, props, body); + } + + @Override + public void basicPublish( + String exchange, + String routingKey, + boolean mandatory, + boolean immediate, + AMQP.BasicProperties props, + byte[] body) + throws IOException { + Thread s = + new Thread( + () -> { + try { + latchMap.get(new String(body)).await(); + } catch (InterruptedException ignored) { + ignored.printStackTrace(); + } + if (returnListener != null) { + try { + returnListener.handleReturn( + 1, "200", "replyText", exchange, null, body); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (shouldFail.test(body) + && !alreadyFailed.contains(new String(body))) { + try { + alreadyFailed.add(new String(body)); + onFail.handle(seqNoMap.get(new String(body)), false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + try { + deliveredMessages.add(new String(body)); + onSuccess.handle(seqNoMap.get(new String(body)), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + + publishedMessageArguments.put( + new String(body), Tuple5.of(exchange, routingKey, mandatory, immediate, props)); + seqNoMap.put(new String(body), (long) seqNo++); + messageThreads.put(new String(body), s); + s.start(); + } + + public BlockingQueue getDeliveredMessages() { + return deliveredMessages; + } + + public Tuple5 + getPublishedMessageArguments(String key) { + return publishedMessageArguments.get(key); + } + + public void prepareMessage(String key) { + latchMap.put(key, new CountDownLatch(1)); + } + + public void deliverMessage(String key) { + deliverMessage(key, false); + } + + public void deliverMessage(String key, boolean await) { + CountDownLatch latch = latchMap.get(key); + if (latch != null) { + latch.countDown(); + } + + Thread t = messageThreads.get(key); + if (t != null && await) { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public ConfirmListener addConfirmListener(ConfirmCallback onSuccess, ConfirmCallback onFail) { + this.onSuccess = onSuccess; + this.onFail = onFail; + return null; + } + + @Override + public AMQImpl.Queue.DeclareOk queueDeclare( + String queue, + boolean durable, + boolean exclusive, + boolean autoDelete, + Map arguments) + throws IOException { + this.queueName = queue; + this.isQueueDurable = durable; + this.isQueueExclusive = exclusive; + this.isQueueAutoDelete = autoDelete; + this.queueArguments = arguments; + this.isQueueDeclared = true; + return null; + } + + public boolean isOpen() { + return isOpen; + } + + public boolean isQueueDurable() { + return isQueueDurable; + } + + public boolean isQueueExclusive() { + return isQueueExclusive; + } + + public boolean isQueueAutoDelete() { + return isQueueAutoDelete; + } + + public Map getQueueArguments() { + return queueArguments; + } + + public String getQueueName() { + return queueName; + } + + public boolean isQueueDeclared() { + return isQueueDeclared; + } + + public void deliverAllMessages() { + deliverAllMessages(false); + } + + public void deliverAllMessages(boolean await) { + for (String message : messageThreads.keySet()) { + latchMap.get(message).countDown(); + } + if (await) { + for (Thread t : messageThreads.values()) { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + @Override + public void close() throws IOException { + deliverAllMessages(); + for (Thread t : messageThreads.values()) { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + isOpen = false; + } + + public ReturnListener getReturnListener() { + return returnListener; + } + + public ConfirmCallback getOnSuccess() { + return onSuccess; + } + + public ConfirmCallback getOnFail() { + return onFail; + } + + /// The following methods are not implemented and are not needed for testing + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String s, String s1) throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare( + String s, BuiltinExchangeType builtinExchangeType) throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String s, String s1, boolean b) + throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare( + String s, BuiltinExchangeType builtinExchangeType, boolean b) throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare( + String s, String s1, boolean b, boolean b1, Map map) + throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare( + String s, + BuiltinExchangeType builtinExchangeType, + boolean b, + boolean b1, + Map map) + throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare( + String s, String s1, boolean b, boolean b1, boolean b2, Map map) + throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare( + String s, + BuiltinExchangeType builtinExchangeType, + boolean b, + boolean b1, + boolean b2, + Map map) + throws IOException { + return null; + } + + @Override + public void exchangeDeclareNoWait( + String s, String s1, boolean b, boolean b1, boolean b2, Map map) + throws IOException {} + + @Override + public void exchangeDeclareNoWait( + String s, + BuiltinExchangeType builtinExchangeType, + boolean b, + boolean b1, + boolean b2, + Map map) + throws IOException {} + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclarePassive(String s) throws IOException { + return null; + } + + @Override + public AMQP.Exchange.DeleteOk exchangeDelete(String s, boolean b) throws IOException { + return null; + } + + @Override + public void exchangeDeleteNoWait(String s, boolean b) throws IOException {} + + @Override + public AMQP.Exchange.DeleteOk exchangeDelete(String s) throws IOException { + return null; + } + + @Override + public AMQP.Exchange.BindOk exchangeBind(String s, String s1, String s2) throws IOException { + return null; + } + + @Override + public AMQP.Exchange.BindOk exchangeBind( + String s, String s1, String s2, Map map) throws IOException { + return null; + } + + @Override + public void exchangeBindNoWait(String s, String s1, String s2, Map map) + throws IOException {} + + @Override + public AMQP.Exchange.UnbindOk exchangeUnbind(String s, String s1, String s2) + throws IOException { + return null; + } + + @Override + public AMQP.Exchange.UnbindOk exchangeUnbind( + String s, String s1, String s2, Map map) throws IOException { + return null; + } + + @Override + public void exchangeUnbindNoWait(String s, String s1, String s2, Map map) + throws IOException {} + + @Override + public AMQP.Queue.DeclareOk queueDeclare() throws IOException { + return null; + } + + @Override + public AMQImpl.Confirm.SelectOk confirmSelect() throws IOException { + return null; + } + + @Override + public void addReturnListener(ReturnListener listener) { + this.returnListener = (SerializableReturnListener) listener; + } + + @Override + public ReturnListener addReturnListener(ReturnCallback returnCallback) { + return null; + } + + @Override + public boolean removeReturnListener(ReturnListener returnListener) { + return false; + } + + @Override + public void clearReturnListeners() {} + + @Override + public void addConfirmListener(ConfirmListener confirmListener) {} + + @Override + public boolean removeConfirmListener(ConfirmListener confirmListener) { + return false; + } + + @Override + public void clearConfirmListeners() {} + + @Override + public Consumer getDefaultConsumer() { + return null; + } + + @Override + public void setDefaultConsumer(Consumer consumer) {} + + @Override + public void basicQos(int i, int i1, boolean b) throws IOException {} + + @Override + public void basicQos(int i, boolean b) throws IOException {} + + @Override + public void basicQos(int i) throws IOException {} + + @Override + public void queueDeclareNoWait( + String s, boolean b, boolean b1, boolean b2, Map map) + throws IOException {} + + @Override + public AMQP.Queue.DeclareOk queueDeclarePassive(String s) throws IOException { + return null; + } + + @Override + public AMQP.Queue.DeleteOk queueDelete(String s) throws IOException { + return null; + } + + @Override + public AMQP.Queue.DeleteOk queueDelete(String s, boolean b, boolean b1) throws IOException { + return null; + } + + @Override + public void queueDeleteNoWait(String s, boolean b, boolean b1) throws IOException {} + + @Override + public AMQP.Queue.BindOk queueBind(String s, String s1, String s2) throws IOException { + return null; + } + + @Override + public AMQP.Queue.BindOk queueBind(String s, String s1, String s2, Map map) + throws IOException { + return null; + } + + @Override + public void queueBindNoWait(String s, String s1, String s2, Map map) + throws IOException {} + + @Override + public AMQP.Queue.UnbindOk queueUnbind(String s, String s1, String s2) throws IOException { + return null; + } + + @Override + public AMQP.Queue.UnbindOk queueUnbind(String s, String s1, String s2, Map map) + throws IOException { + return null; + } + + @Override + public AMQP.Queue.PurgeOk queuePurge(String s) throws IOException { + return null; + } + + @Override + public GetResponse basicGet(String s, boolean b) throws IOException { + return null; + } + + @Override + public void basicAck(long l, boolean b) throws IOException {} + + @Override + public void basicNack(long l, boolean b, boolean b1) throws IOException {} + + @Override + public void basicReject(long l, boolean b) throws IOException {} + + @Override + public String basicConsume(String s, Consumer consumer) throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, DeliverCallback deliverCallback, CancelCallback cancelCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + DeliverCallback deliverCallback, + CancelCallback cancelCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume(String s, boolean b, Consumer consumer) throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, boolean b, DeliverCallback deliverCallback, CancelCallback cancelCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + DeliverCallback deliverCallback, + CancelCallback cancelCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume(String s, boolean b, Map map, Consumer consumer) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + Map map, + DeliverCallback deliverCallback, + CancelCallback cancelCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + Map map, + DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + Map map, + DeliverCallback deliverCallback, + CancelCallback cancelCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume(String s, boolean b, String s1, Consumer consumer) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + DeliverCallback deliverCallback, + CancelCallback cancelCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + DeliverCallback deliverCallback, + CancelCallback cancelCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + boolean b1, + boolean b2, + Map map, + Consumer consumer) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + boolean b1, + boolean b2, + Map map, + DeliverCallback deliverCallback, + CancelCallback cancelCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + boolean b1, + boolean b2, + Map map, + DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public String basicConsume( + String s, + boolean b, + String s1, + boolean b1, + boolean b2, + Map map, + DeliverCallback deliverCallback, + CancelCallback cancelCallback, + ConsumerShutdownSignalCallback consumerShutdownSignalCallback) + throws IOException { + return null; + } + + @Override + public void basicCancel(String s) throws IOException {} + + @Override + public AMQP.Basic.RecoverOk basicRecover() throws IOException { + return null; + } + + @Override + public AMQP.Basic.RecoverOk basicRecover(boolean b) throws IOException { + return null; + } + + @Override + public AMQP.Tx.SelectOk txSelect() throws IOException { + return null; + } + + @Override + public AMQP.Tx.CommitOk txCommit() throws IOException { + return null; + } + + @Override + public AMQP.Tx.RollbackOk txRollback() throws IOException { + return null; + } + + @Override + public int getChannelNumber() { + return 0; + } + + @Override + public Connection getConnection() { + return null; + } + + @Override + public void close(int i, String s) throws IOException, TimeoutException {} + + @Override + public void abort() throws IOException {} + + @Override + public void abort(int i, String s) throws IOException {} + + @Override + public long getNextPublishSeqNo() { + return seqNo; + } + + @Override + public boolean waitForConfirms() throws InterruptedException { + return false; + } + + @Override + public boolean waitForConfirms(long l) throws InterruptedException, TimeoutException { + return false; + } + + @Override + public void waitForConfirmsOrDie() throws IOException, InterruptedException {} + + @Override + public void waitForConfirmsOrDie(long l) + throws IOException, InterruptedException, TimeoutException {} + + @Override + public void asyncRpc(Method method) throws IOException {} + + @Override + public Command rpc(Method method) throws IOException { + return null; + } + + @Override + public long messageCount(String s) throws IOException { + return 0; + } + + @Override + public long consumerCount(String s) throws IOException { + return 0; + } + + @Override + public CompletableFuture asyncCompletableRpc(Method method) throws IOException { + return null; + } + + @Override + public void addShutdownListener(ShutdownListener shutdownListener) {} + + @Override + public void removeShutdownListener(ShutdownListener shutdownListener) {} + + @Override + public ShutdownSignalException getCloseReason() { + return null; + } + + @Override + public void notifyListeners() {} +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestConnection.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestConnection.java new file mode 100644 index 00000000000..fc45ab7196a --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestConnection.java @@ -0,0 +1,222 @@ +package org.apache.flink.connector.rabbitmq.sink.util; + +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; + +import com.rabbitmq.client.BlockedCallback; +import com.rabbitmq.client.BlockedListener; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ExceptionHandler; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.UnblockedCallback; +import com.rabbitmq.client.impl.AMQConnection; +import com.rabbitmq.client.impl.Frame; +import com.rabbitmq.client.impl.FrameHandler; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketException; +import java.util.Map; +import java.util.function.Supplier; + +/** Test connection for RabbitMQ. */ +public class TestConnection implements Connection { + + private final RabbitMQConnectionConfig connectionConfig; + private boolean isOpen = true; + + private Supplier channelSupplier; + + public TestConnection(RabbitMQConnectionConfig connectionConfig) { + this.connectionConfig = connectionConfig; + } + + public void setChannelSupplier(Supplier channelSupplier) { + this.channelSupplier = channelSupplier; + } + + @Override + public InetAddress getAddress() { + return null; + } + + @Override + public int getPort() { + return connectionConfig.getPort(); + } + + @Override + public int getChannelMax() { + return connectionConfig.getRequestedChannelMax(); + } + + @Override + public int getFrameMax() { + return connectionConfig.getRequestedFrameMax(); + } + + @Override + public int getHeartbeat() { + return 0; + } + + @Override + public Map getClientProperties() { + return null; + } + + @Override + public String getClientProvidedName() { + return null; + } + + @Override + public Map getServerProperties() { + return null; + } + + @Override + public Channel createChannel() throws IOException { + return channelSupplier.get(); + } + + @Override + public Channel createChannel(int i) throws IOException { + return channelSupplier.get(); + } + + @Override + public void close() throws IOException { + isOpen = false; + } + + @Override + public void close(int i, String s) throws IOException { + isOpen = false; + } + + @Override + public void close(int i) throws IOException { + isOpen = false; + } + + @Override + public void close(int i, String s, int i1) throws IOException { + isOpen = false; + } + + @Override + public void abort() {} + + @Override + public void abort(int i, String s) {} + + @Override + public void abort(int i) {} + + @Override + public void abort(int i, String s, int i1) {} + + @Override + public void addBlockedListener(BlockedListener blockedListener) {} + + @Override + public BlockedListener addBlockedListener( + BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) { + return null; + } + + @Override + public boolean removeBlockedListener(BlockedListener blockedListener) { + return false; + } + + @Override + public void clearBlockedListeners() {} + + @Override + public ExceptionHandler getExceptionHandler() { + return null; + } + + @Override + public String getId() { + return null; + } + + @Override + public void setId(String s) {} + + @Override + public void addShutdownListener(ShutdownListener shutdownListener) {} + + @Override + public void removeShutdownListener(ShutdownListener shutdownListener) {} + + @Override + public ShutdownSignalException getCloseReason() { + return null; + } + + @Override + public void notifyListeners() {} + + @Override + public boolean isOpen() { + return isOpen; + } + + /** No-op frame handler for RabbitMQ. */ + public static class TestFrameHandler implements FrameHandler { + public TestFrameHandler() {} + + @Override + public void setTimeout(int i) throws SocketException {} + + @Override + public int getTimeout() throws SocketException { + return 0; + } + + @Override + public void sendHeader() throws IOException {} + + @Override + public void initialize(AMQConnection amqConnection) {} + + @Override + public Frame readFrame() throws IOException { + return null; + } + + @Override + public void writeFrame(Frame frame) throws IOException {} + + @Override + public void flush() throws IOException {} + + @Override + public void close() {} + + @Override + public InetAddress getLocalAddress() { + return null; + } + + @Override + public int getLocalPort() { + return 0; + } + + @Override + public InetAddress getAddress() { + return null; + } + + @Override + public int getPort() { + return 0; + } + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestMessageConverter.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestMessageConverter.java new file mode 100644 index 00000000000..9d51011f86b --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/util/TestMessageConverter.java @@ -0,0 +1,56 @@ +package org.apache.flink.connector.rabbitmq.sink.util; + +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessage; +import org.apache.flink.connector.rabbitmq.common.RabbitMQMessageConverter; + +import com.rabbitmq.client.AMQP; + +/** Test message converter for RabbitMQ. */ +public class TestMessageConverter implements RabbitMQMessageConverter { + + private boolean markNextMessageAsMandatory = false; + private boolean markNextMessageAsImmediate = false; + + private String exchange = null; + + private String routingKey = null; + + private AMQP.BasicProperties properties = null; + + public void setProperties(AMQP.BasicProperties properties) { + this.properties = properties; + } + + public void setExchange(String exchange) { + this.exchange = exchange; + } + + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + public void setMarkNextMessageAsMandatory(boolean markNextMessageAsMandatory) { + this.markNextMessageAsMandatory = markNextMessageAsMandatory; + } + + public void setMarkNextMessageAsImmediate(boolean markNextMessageAsImmediate) { + this.markNextMessageAsImmediate = markNextMessageAsImmediate; + } + + @Override + public RabbitMQMessage toRabbitMQMessage(String value) { + return RabbitMQMessage.builder() + .setMessage(value) + .setExchange(exchange) + .setMandatory(markNextMessageAsMandatory) + .setImmediate(markNextMessageAsImmediate) + .setRoutingKey(routingKey) + .setMessageProperties(properties) + .build(); + } + + @Override + public boolean supportsExchangeRouting() { + return true; + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java index 84731182e38..44c83898716 100644 --- a/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.connectors.rabbitmq.common; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionSetupException; + import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; @@ -33,9 +35,10 @@ /** Tests for the {@link RMQConnectionConfig}. */ public class RMQConnectionConfigTest { - @Test(expected = NullPointerException.class) - public void shouldThrowNullPointExceptionIfHostIsNull() - throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfHostIsNull() + throws RabbitMQConnectionSetupException, URISyntaxException, NoSuchAlgorithmException, + KeyManagementException { RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setPort(1000) @@ -46,22 +49,20 @@ public void shouldThrowNullPointExceptionIfHostIsNull() connectionConfig.getConnectionFactory(); } - @Test(expected = NullPointerException.class) - public void shouldThrowNullPointExceptionIfPortIsNull() - throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { - RMQConnectionConfig connectionConfig = - new RMQConnectionConfig.Builder() - .setHost("localhost") - .setUserName("guest") - .setPassword("guest") - .setVirtualHost("/") - .build(); - connectionConfig.getConnectionFactory(); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfPortIsNull() { + new RMQConnectionConfig.Builder() + .setHost("localhost") + .setUserName("guest") + .setPassword("guest") + .setVirtualHost("/") + .build(); } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() - throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + throws RabbitMQConnectionSetupException, URISyntaxException, NoSuchAlgorithmException, + KeyManagementException { RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("localhost") @@ -75,7 +76,8 @@ public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() @Test public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() - throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + throws RabbitMQConnectionSetupException, URISyntaxException, NoSuchAlgorithmException, + KeyManagementException { RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("localhost") diff --git a/pom.xml b/pom.xml index 23c5fd7a3e5..0d9f32aea20 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ under the License. - 1.16.0 + 1.19.0 5.10.1 3.23.1