Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to Flink 1.19 #29

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.18-SNAPSHOT ]
jdk: [ '8, 11, 17' ]
include:
- flink: 1.19-SNAPSHOT
jdk: '8, 11, 17, 21'
flink: [ 1.19-SNAPSHOT ]
jdk: [ '8, 11, 17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.18-SNAPSHOT,
jdk: '8, 11, 17',
branch: main
}, {
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource$RMQCollectorImpl.setMessageIdentifiers(java.lang.String, long)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (RMQSource.java:415)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close()> calls method <org.apache.flink.util.ExceptionUtils.firstOrSuppressed(java.lang.Throwable, java.lang.Throwable)> in (RMQSource.java:303)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close()> calls method <org.apache.flink.util.IOUtils.closeAll([Ljava.lang.AutoCloseable;)> in (RMQSource.java:300)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters.deserializationAdapter(org.apache.flink.api.common.functions.RuntimeContext, java.util.function.Function)> in (RMQSource.java:275)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()> in (RMQSource.java:254)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.util.IOUtils.closeAllQuietly([Ljava.lang.AutoCloseable;)> in (RMQSource.java:266)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> checks instanceof <org.apache.flink.streaming.api.operators.StreamingRuntimeContext> in (RMQSource.java:253)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (RMQSource.java:0)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupQueue()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (RMQSource.java:0)
3 changes: 2 additions & 1 deletion flink-connector-rabbitmq/archunit-violations/stored.rules
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
#Mon Apr 03 14:18:47 CEST 2023
#Thu May 02 14:30:30 BST 2024
Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ annotated\ with\ @Public.=f67f70fc-4a24-448c-a247-354e7ce69167
Connector\ production\ code\ must\ not\ depend\ on\ non-public\ API\ outside\ of\ connector\ packages=deb59a69-6a64-49f2-8aa3-84985ee63d70
ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=6fdbfe74-a937-4a8a-8e1b-9f0a3391f3fe
Expand All @@ -8,3 +8,4 @@ Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ packa
Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=675cade4-c44e-4b2b-aacf-0c23d2032e4a
Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ must\ be\ annotated\ with\ @Public(Evolving).=871721c9-4c5f-4523-b8f6-a419e8a0085f
Classes\ in\ API\ packages\ should\ have\ at\ least\ one\ API\ visibility\ annotation.=54a3d1fc-24ac-4bdc-bf15-56e8d7831aed
Connector\ production\ code\ must\ depend\ only\ on\ public\ API\ when\ outside\ of\ connector\ packages=a6cee285-bdbf-4479-a652-8143c2bc1a69
15 changes: 15 additions & 0 deletions flink-connector-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.flink.connector.rabbitmq.common;

import org.apache.flink.annotation.PublicEvolving;

/** Constants for the RabbitMQ connector. */
@PublicEvolving
public class Constants {
vahmed-hamdy marked this conversation as resolved.
Show resolved Hide resolved

/** The default RabbitMQ host Exchange used when exchange routing is disabled. */
public static final String DEFAULT_EXCHANGE = "";

/** The default maximum number of inflight messages handled by SinkWriter at the same time. */
public static final int DEFAULT_MAX_INFLIGHT = 100;

/** The default behaviour of sink on failing to send elements. */
public static final boolean DEFAULT_FAIL_ON_ERROR = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.flink.connector.rabbitmq.common;

import org.apache.flink.annotation.PublicEvolving;

import static org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_EXCHANGE;

/**
* Default implementation of {@link RabbitMQMessageConverter}.
*
* @param <T> type of the message to be converted
*/
@PublicEvolving
public class DefaultRabbitMQMessageConverter<T> implements RabbitMQMessageConverter<T> {
@Override
public RabbitMQMessage<T> toRabbitMQMessage(T value) {
return RabbitMQMessage.<T>builder().setMessage(value).setExchange(DEFAULT_EXCHANGE).build();
}

@Override
public boolean supportsExchangeRouting() {
return false;
}
}
Loading