Skip to content

Commit

Permalink
[FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version …
Browse files Browse the repository at this point in the history
…to 1.19
  • Loading branch information
vahmed-hamdy committed May 3, 2024
1 parent 66e323a commit cdb6c0d
Show file tree
Hide file tree
Showing 29 changed files with 3,135 additions and 599 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.18-SNAPSHOT ]
jdk: [ '8, 11, 17' ]
include:
- flink: 1.19-SNAPSHOT
jdk: '8, 11, 17, 21'
flink: [ 1.19-SNAPSHOT ]
jdk: [ '8, 11, 17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.18-SNAPSHOT,
jdk: '8, 11, 17',
branch: main
}, {
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource$RMQCollectorImpl.setMessageIdentifiers(java.lang.String, long)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (RMQSource.java:415)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close()> calls method <org.apache.flink.util.ExceptionUtils.firstOrSuppressed(java.lang.Throwable, java.lang.Throwable)> in (RMQSource.java:303)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close()> calls method <org.apache.flink.util.IOUtils.closeAll([Ljava.lang.AutoCloseable;)> in (RMQSource.java:300)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters.deserializationAdapter(org.apache.flink.api.common.functions.RuntimeContext, java.util.function.Function)> in (RMQSource.java:275)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()> in (RMQSource.java:254)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.util.IOUtils.closeAllQuietly([Ljava.lang.AutoCloseable;)> in (RMQSource.java:266)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(org.apache.flink.configuration.Configuration)> checks instanceof <org.apache.flink.streaming.api.operators.StreamingRuntimeContext> in (RMQSource.java:253)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (RMQSource.java:0)
Method <org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupQueue()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (RMQSource.java:0)
3 changes: 2 additions & 1 deletion flink-connector-rabbitmq/archunit-violations/stored.rules
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
#Mon Apr 03 14:18:47 CEST 2023
#Thu May 02 14:30:30 BST 2024
Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ annotated\ with\ @Public.=f67f70fc-4a24-448c-a247-354e7ce69167
Connector\ production\ code\ must\ not\ depend\ on\ non-public\ API\ outside\ of\ connector\ packages=deb59a69-6a64-49f2-8aa3-84985ee63d70
ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=6fdbfe74-a937-4a8a-8e1b-9f0a3391f3fe
Expand All @@ -8,3 +8,4 @@ Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ packa
Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=675cade4-c44e-4b2b-aacf-0c23d2032e4a
Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ must\ be\ annotated\ with\ @Public(Evolving).=871721c9-4c5f-4523-b8f6-a419e8a0085f
Classes\ in\ API\ packages\ should\ have\ at\ least\ one\ API\ visibility\ annotation.=54a3d1fc-24ac-4bdc-bf15-56e8d7831aed
Connector\ production\ code\ must\ depend\ only\ on\ public\ API\ when\ outside\ of\ connector\ packages=a6cee285-bdbf-4479-a652-8143c2bc1a69
15 changes: 15 additions & 0 deletions flink-connector-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.flink.connector.rabbitmq.common;

import org.apache.flink.annotation.PublicEvolving;

/** Constants for the RabbitMQ connector. */
@PublicEvolving
public class Constants {
public static final String DEFAULT_EXCHANGE = "";

public static final int DEFAULT_MAX_INFLIGHT = 100;

public static final boolean DEFAULT_FAIL_ON_ERROR = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.flink.connector.rabbitmq.common;

import org.apache.flink.annotation.PublicEvolving;

import static org.apache.flink.connector.rabbitmq.common.Constants.DEFAULT_EXCHANGE;

/**
* Default implementation of {@link RabbitMQMessageConverter}.
*
* @param <T> type of the message to be converted
*/
@PublicEvolving
public class DefaultRabbitMQMessageConverter<T> implements RabbitMQMessageConverter<T> {
@Override
public RabbitMQMessage<T> toRabbitMQMessage(T value) {
return RabbitMQMessage.<T>builder().setMessage(value).setExchange(DEFAULT_EXCHANGE).build();
}

@Override
public boolean supportsExchangeRouting() {
return false;
}
}
Loading

0 comments on commit cdb6c0d

Please sign in to comment.