diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 14be77d..dd5ecbc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,7 +72,7 @@ jobs: run: docker-compose -f docker-compose.yml up -d - name: Build project - run: sbt ++${{ matrix.scala }} compile scalafmtCheckAll plugin/test + run: sbt ++${{ matrix.scala }} compile scalafmtCheckAll javafmtCheckAll plugin/test - name: Test Consumer run: sbt ++${{ matrix.scala }} consumer/test diff --git a/.scalafmt.conf b/.scalafmt.conf index a5b6eff..9a22970 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,5 +1,5 @@ # **Note** that config order in this file is important since what comes afterwards takes precedence -version = 3.7.1 +version = 3.7.3 project.git = true runner.dialect = scala213 align = true diff --git a/docs/modules/ROOT/pages/getting_started.adoc b/docs/modules/ROOT/pages/getting_started.adoc index 7a885d9..957a885 100644 --- a/docs/modules/ROOT/pages/getting_started.adoc +++ b/docs/modules/ROOT/pages/getting_started.adoc @@ -42,7 +42,7 @@ mkdir -p ~/.pact/plugins/avro-{version} + [source,shell,subs=attributes] ---- -wget -c https://github.com/austek/pact-avro-plugin/releases/download/v-{version}/pact-avro-plugin-{version}.tgz -O ~/.pact/plugins/avro-{version}/pact-avro-plugin.tgz +wget -c https://github.com/austek/pact-avro-plugin/releases/download/v{version}/pact-avro-plugin-{version}.tgz -O ~/.pact/plugins/avro-{version}/pact-avro-plugin.tgz ---- . Unpack the plugin executable: diff --git a/github-actions.sbt b/github-actions.sbt index 74a2eb1..deb3193 100644 --- a/github-actions.sbt +++ b/github-actions.sbt @@ -6,7 +6,7 @@ ThisBuild / githubWorkflowBuild := Seq( ), WorkflowStep.Sbt( name = Some("Build project"), - commands = List("compile", "scalafmtCheckAll", "plugin/test") + commands = List("compile", "scalafmtCheckAll", "javafmtCheckAll", "plugin/test") ), WorkflowStep.Sbt( name = Some("Test Consumer"), diff --git a/modules/examples/consumer/src/main/resources/avro/order-v1.avsc b/modules/examples/consumer/src/main/resources/avro/order-v1.avsc new file mode 100644 index 0000000..47a47fa --- /dev/null +++ b/modules/examples/consumer/src/main/resources/avro/order-v1.avsc @@ -0,0 +1,79 @@ +[ + { + "type": "record", + "name": "OrderNewEvent", + "namespace": "com.collibra.event.client.examples.showcase.schema", + "fields": [ + { + "name": "createdOn", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-micros" + } + ] + }, + { + "name": "items", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "OrderItem", + "namespace": "com.collibra.event.client.examples.showcase.domain", + "fields": [ + { + "name": "itemId", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ] + }, + { + "name": "quantity", + "type": "int" + } + ] + } + } + ] + }, + { + "name": "orderId", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ] + }, + { + "name": "userId", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ] + }, + { + "name": "walletId", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ] + } + ] + } +] diff --git a/modules/examples/consumer/src/main/resources/avro/orders.avsc b/modules/examples/consumer/src/main/resources/avro/orders.avsc index 7f33869..7328e6e 100644 --- a/modules/examples/consumer/src/main/resources/avro/orders.avsc +++ b/modules/examples/consumer/src/main/resources/avro/orders.avsc @@ -82,6 +82,16 @@ "type": "array", "items": "com.github.austek.example.Item" } + }, + { + "name": "userId", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ] } ] } diff --git a/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/OrderV1ConsumerTest.java b/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/OrderV1ConsumerTest.java new file mode 100644 index 0000000..002c2a5 --- /dev/null +++ b/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/OrderV1ConsumerTest.java @@ -0,0 +1,102 @@ +package com.github.austek.example.pulsar.avro; + +import static com.github.austek.example.pulsar.avro.PactPulsarConsumerTest.arrayByteToAvroRecord; +import static org.assertj.core.api.Assertions.assertThat; + +import au.com.dius.pact.consumer.dsl.PactBuilder; +import au.com.dius.pact.consumer.junit5.PactConsumerTestExt; +import au.com.dius.pact.consumer.junit5.PactTestFor; +import au.com.dius.pact.consumer.junit5.ProviderType; +import au.com.dius.pact.core.model.ContentTypeHint; +import au.com.dius.pact.core.model.PactSpecVersion; +import au.com.dius.pact.core.model.V4Interaction; +import au.com.dius.pact.core.model.V4Pact; +import au.com.dius.pact.core.model.annotations.Pact; +import au.com.dius.pact.core.model.matchingrules.MatchingRule; +import au.com.dius.pact.core.model.matchingrules.MatchingRuleCategory; +import au.com.dius.pact.core.model.matchingrules.MatchingRuleGroup; +import au.com.dius.pact.core.model.matchingrules.MatchingRulesImpl; +import au.com.dius.pact.core.model.v4.MessageContents; +import com.collibra.event.client.examples.showcase.domain.OrderItem; +import com.collibra.event.client.examples.showcase.schema.OrderNewEvent; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(PactConsumerTestExt.class) +@PactTestFor( + pactVersion = PactSpecVersion.V4, + providerType = ProviderType.ASYNCH, + providerName = "OrderTopicV1") +class OrderV1ConsumerTest { + private final String schemasPath = + Objects.requireNonNull(getClass().getResource("/avro/order-v1.avsc")).getPath(); + + @Pact(consumer = "OrderTopicConsumer") + V4Pact configureRecordWithDependantRecord(PactBuilder builder) { + var messageBody = + Map.of( + "message.contents", + Map.ofEntries( + Map.entry("pact:avro", schemasPath), + Map.entry("pact:record-name", "OrderNewEvent"), + Map.entry("pact:content-type", "avro/binary"), + Map.entry("orderId", "notEmpty('0c7cbb5a-9a9a-4088-9713-c0c88475c903')"), + Map.entry("userId", "notEmpty('20bef962-8cbd-4b8c-8337-97ae385ac45d')"), + Map.entry( + "items", + List.of( + Map.of( + "itemId", "notEmpty('e41c5f30-fa8e-4cfd-989d-95ca5a04037f')", + "quantity", "notEmpty('1')"), + Map.of( + "itemId", "notEmpty('8a62474a-7157-4c67-9126-c6dcecb1df08')", + "quantity", "notEmpty('2')"))))); + return builder + .usingPlugin("avro") + .expectsToReceive("Order Created", "core/interaction/message") + .with(messageBody) + .toPact(); + } + + @Test + @PactTestFor(pactMethod = "configureRecordWithDependantRecord") + void consumerRecordWithDependantRecord(V4Interaction.AsynchronousMessage message) + throws IOException { + MessageContents messageContents = message.getContents(); + List orders = + arrayByteToAvroRecord(OrderNewEvent.class, messageContents.getContents().getValue()); + OrderNewEvent order = assertFirstOrder(orders); + + assertThat(messageContents.getContents().getContentType()) + .hasToString("avro/binary; record=OrderNewEvent"); + assertThat(messageContents.getContents().getContentTypeHint()) + .isEqualTo(ContentTypeHint.BINARY); + + Map ruleCategoryMap = + ((MatchingRulesImpl) messageContents.getMatchingRules()).getRules(); + assertThat(ruleCategoryMap).hasSize(1); + Map rules = ruleCategoryMap.get("body").getMatchingRules(); + List idRules = rules.get("$.userId").getRules(); + assertThat(idRules).hasSize(1); + assertThat(idRules.get(0)).extracting("name").isEqualTo("not-empty"); + } + + private static OrderNewEvent assertFirstOrder(List orders) { + assertThat(orders).hasSize(1); + OrderNewEvent order = orders.get(0); + assertThat(order.getOrderId()).hasToString("0c7cbb5a-9a9a-4088-9713-c0c88475c903"); + assertThat(order.getUserId()).hasToString("20bef962-8cbd-4b8c-8337-97ae385ac45d"); + assertThat(order.getItems()).hasSize(2); + OrderItem item1 = order.getItems().get(0); + assertThat(item1.getItemId()).hasToString("e41c5f30-fa8e-4cfd-989d-95ca5a04037f"); + assertThat(item1.getQuantity()).isEqualTo(1); + OrderItem item2 = order.getItems().get(1); + assertThat(item2.getItemId()).hasToString("8a62474a-7157-4c67-9126-c6dcecb1df08"); + assertThat(item2.getQuantity()).isEqualTo(2L); + return order; + } +} diff --git a/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarConsumerTest.java b/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarConsumerTest.java index 23f2082..4be3b6c 100644 --- a/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarConsumerTest.java +++ b/modules/examples/consumer/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarConsumerTest.java @@ -1,5 +1,8 @@ package com.github.austek.example.pulsar.avro; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + import au.com.dius.pact.consumer.dsl.PactBuilder; import au.com.dius.pact.consumer.junit5.PactConsumerTestExt; import au.com.dius.pact.consumer.junit5.PactTestFor; @@ -17,121 +20,127 @@ import com.github.austek.example.Item; import com.github.austek.example.Order; import com.github.austek.example.Status; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.*; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; - @ExtendWith(PactConsumerTestExt.class) -@PactTestFor(providerName = "order-provider", providerType = ProviderType.ASYNCH, pactVersion = PactSpecVersion.V4) +@PactTestFor( + providerName = "order-provider", + providerType = ProviderType.ASYNCH, + pactVersion = PactSpecVersion.V4) class PactPulsarConsumerTest { - private final String schemasPath = Objects.requireNonNull(getClass().getResource("/avro/orders.avsc")).getPath(); - private final OrderService orderService = new OrderService(); + private final String schemasPath = + Objects.requireNonNull(getClass().getResource("/avro/orders.avsc")).getPath(); + private final OrderService orderService = new OrderService(); - @Pact(consumer = "avro-consumer") - V4Pact configureRecordWithDependantRecord(PactBuilder builder) { - // tag::configuration[] - return builder - .usingPlugin("avro") - .expectsToReceive("Order Created", "core/interaction/message") - .with(Map.of( - "message.contents", Map.ofEntries( - Map.entry("pact:avro", schemasPath), - Map.entry("pact:record-name", "Order"), - Map.entry("pact:content-type", "avro/binary"), - Map.entry("id", "notEmpty('100')"), - Map.entry("names", "notEmpty('name-1')"), - Map.entry("enabled", "matching(boolean, true)"), - Map.entry("height", "matching(decimal, 15.8)"), - Map.entry("width", "matching(decimal, 1.8)"), - Map.entry("status", "matching(equalTo, 'CREATED')"), - Map.entry("address", Map.of( - "no", "matching(integer, 121)", - "street", "matching(equalTo, 'street name')" - )), - Map.entry("items", List.of( - Map.of( - "name", "notEmpty('Item-1')", - "id", "notEmpty('1')" - ), - Map.of( - "name", "notEmpty('Item-2')", - "id", "notEmpty('2')" - ) - )) - ) - )) - .toPact(); - // end::configuration[] - } + @Pact(consumer = "avro-consumer") + V4Pact configureRecordWithDependantRecord(PactBuilder builder) { + // tag::configuration[] + return builder + .usingPlugin("avro") + .expectsToReceive("Order Created", "core/interaction/message") + .with( + Map.of( + "message.contents", + Map.ofEntries( + Map.entry("pact:avro", schemasPath), + Map.entry("pact:record-name", "Order"), + Map.entry("pact:content-type", "avro/binary"), + Map.entry("id", "notEmpty('100')"), + Map.entry("names", "notEmpty('name-1')"), + Map.entry("enabled", "matching(boolean, true)"), + Map.entry("height", "matching(decimal, 15.8)"), + Map.entry("width", "matching(decimal, 1.8)"), + Map.entry("status", "matching(equalTo, 'CREATED')"), + Map.entry( + "address", + Map.of( + "no", "matching(integer, 121)", + "street", "matching(equalTo, 'street name')")), + Map.entry( + "items", + List.of( + Map.of( + "name", "notEmpty('Item-1')", + "id", "notEmpty('1')"), + Map.of( + "name", "notEmpty('Item-2')", + "id", "notEmpty('2')"))), + Map.entry("userId", "notEmpty('20bef962-8cbd-4b8c-8337-97ae385ac45d')")))) + .toPact(); + // end::configuration[] + } - // tag::consumer_test[] - @Test - @PactTestFor(pactMethod = "configureRecordWithDependantRecord") - void consumerRecordWithDependantRecord(V4Interaction.AsynchronousMessage message) throws IOException { - MessageContents messageContents = message.getContents(); - List orders = arrayByteToAvroRecord(Order.class, messageContents.getContents().getValue()); - Order order = assertFirstOrder(orders); + // tag::consumer_test[] + @Test + @PactTestFor(pactMethod = "configureRecordWithDependantRecord") + void consumerRecordWithDependantRecord(V4Interaction.AsynchronousMessage message) + throws IOException { + MessageContents messageContents = message.getContents(); + List orders = + arrayByteToAvroRecord(Order.class, messageContents.getContents().getValue()); + Order order = assertFirstOrder(orders); - assertThat(messageContents.getContents().getContentType()).hasToString("avro/binary; record=Order"); - assertThat(messageContents.getContents().getContentTypeHint()).isEqualTo(ContentTypeHint.BINARY); + assertThat(messageContents.getContents().getContentType()) + .hasToString("avro/binary; record=Order"); + assertThat(messageContents.getContents().getContentTypeHint()) + .isEqualTo(ContentTypeHint.BINARY); - Map ruleCategoryMap = ((MatchingRulesImpl) messageContents.getMatchingRules()).getRules(); - assertThat(ruleCategoryMap).hasSize(1); - Map rules = ruleCategoryMap.get("body").getMatchingRules(); - List idRules = rules.get("$.id").getRules(); - assertThat(idRules).hasSize(1); - assertThat(idRules.get(0)).extracting("name").isEqualTo("not-empty"); - List name0Rules = rules.get("$.names").getRules(); - assertThat(name0Rules).hasSize(1); - assertThat(name0Rules.get(0)).extracting("name").isEqualTo("not-empty"); + Map ruleCategoryMap = + ((MatchingRulesImpl) messageContents.getMatchingRules()).getRules(); + assertThat(ruleCategoryMap).hasSize(1); + Map rules = ruleCategoryMap.get("body").getMatchingRules(); + List idRules = rules.get("$.id").getRules(); + assertThat(idRules).hasSize(1); + assertThat(idRules.get(0)).extracting("name").isEqualTo("not-empty"); + List name0Rules = rules.get("$.names").getRules(); + assertThat(name0Rules).hasSize(1); + assertThat(name0Rules.get(0)).extracting("name").isEqualTo("not-empty"); - assertDoesNotThrow(() -> orderService.process(order)); - } + assertThat(order.getUserId()) + .isEqualTo(UUID.fromString("20bef962-8cbd-4b8c-8337-97ae385ac45d")); - private List arrayByteToAvroRecord(Class c, byte[] bytes) throws IOException { - SpecificDatumReader datumReader = new SpecificDatumReader<>(c); - List records = new ArrayList<>(); + assertDoesNotThrow(() -> orderService.process(order)); + } - try (ByteArrayInputStream in = new ByteArrayInputStream(bytes)) { - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); - while (!decoder.isEnd()) records.add(datumReader.read(null, decoder)); - } + public static List arrayByteToAvroRecord(Class c, byte[] bytes) throws IOException { + SpecificDatumReader datumReader = new SpecificDatumReader<>(c); + List records = new ArrayList<>(); - return records; + try (ByteArrayInputStream in = new ByteArrayInputStream(bytes)) { + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); + while (!decoder.isEnd()) records.add(datumReader.read(null, decoder)); } - // end::consumer_test[] - private static Order assertFirstOrder(List orders) { - assertThat(orders).hasSize(1); - Order order = orders.get(0); - assertThat(order.getId()).isEqualTo(100); - assertThat(order.getNames()).hasToString("name-1"); - assertThat(order.getEnabled()).isTrue(); - assertThat(order.getHeight()).isEqualTo(15.8F); - assertThat(order.getWidth()).isEqualTo(1.8D); - assertThat(order.getStatus()).isEqualTo(Status.CREATED); - assertThat(order.getAddress().getNo()).isEqualTo(121); - assertThat(order.getAddress().getStreet()).hasToString("street name"); - assertThat(order.getAddress().getZipcode()).isNull(); - assertThat(order.getItems()).hasSize(2); - Item item1 = order.getItems().get(0); - assertThat(item1.getName()).hasToString("Item-1"); - assertThat(item1.getId()).isEqualTo(1L); - Item item2 = order.getItems().get(1); - assertThat(item2.getName()).hasToString("Item-2"); - assertThat(item2.getId()).isEqualTo(2L); - return order; - } + return records; + } + // end::consumer_test[] + + private static Order assertFirstOrder(List orders) { + assertThat(orders).hasSize(1); + Order order = orders.get(0); + assertThat(order.getId()).isEqualTo(100); + assertThat(order.getNames()).hasToString("name-1"); + assertThat(order.getEnabled()).isTrue(); + assertThat(order.getHeight()).isEqualTo(15.8F); + assertThat(order.getWidth()).isEqualTo(1.8D); + assertThat(order.getStatus()).isEqualTo(Status.CREATED); + assertThat(order.getAddress().getNo()).isEqualTo(121); + assertThat(order.getAddress().getStreet()).hasToString("street name"); + assertThat(order.getAddress().getZipcode()).isNull(); + assertThat(order.getItems()).hasSize(2); + Item item1 = order.getItems().get(0); + assertThat(item1.getName()).hasToString("Item-1"); + assertThat(item1.getId()).isEqualTo(1L); + Item item2 = order.getItems().get(1); + assertThat(item2.getName()).hasToString("Item-2"); + assertThat(item2.getId()).isEqualTo(2L); + return order; + } } diff --git a/modules/examples/provider/src/main/avro/orders.avsc b/modules/examples/provider/src/main/avro/orders.avsc index 7f33869..7328e6e 100644 --- a/modules/examples/provider/src/main/avro/orders.avsc +++ b/modules/examples/provider/src/main/avro/orders.avsc @@ -82,6 +82,16 @@ "type": "array", "items": "com.github.austek.example.Item" } + }, + { + "name": "userId", + "type": [ + "null", + { + "type": "string", + "logicalType": "uuid" + } + ] } ] } diff --git a/modules/examples/provider/src/main/scala/com/github/austek/example/OrderProviderApplication.scala b/modules/examples/provider/src/main/scala/com/github/austek/example/OrderProviderApplication.scala index 8e8c1fd..3ef46dc 100644 --- a/modules/examples/provider/src/main/scala/com/github/austek/example/OrderProviderApplication.scala +++ b/modules/examples/provider/src/main/scala/com/github/austek/example/OrderProviderApplication.scala @@ -11,13 +11,11 @@ object OrderProviderApplication { private val controller = new OrderProducerController(config) - sys.addShutdownHook { - println("Closing producer and pulsar client..") - controller.close() - } - def main(args: Array[String]): Unit = { - controller.producerOrders(config.orderProducer.numberOfOrders) - () + val _ = sys.addShutdownHook { + println("Closing producer and pulsar client..") + controller.close() + } + val _ = controller.producerOrders(config.orderProducer.numberOfOrders) } } diff --git a/modules/examples/provider/src/main/scala/com/github/austek/example/pulsar/avro/OrderRandomization.scala b/modules/examples/provider/src/main/scala/com/github/austek/example/pulsar/avro/OrderRandomization.scala index 665eaec..ee71665 100644 --- a/modules/examples/provider/src/main/scala/com/github/austek/example/pulsar/avro/OrderRandomization.scala +++ b/modules/examples/provider/src/main/scala/com/github/austek/example/pulsar/avro/OrderRandomization.scala @@ -5,6 +5,7 @@ import org.scalacheck.{Arbitrary, Gen} import util.randomization.Randomization._ import java.nio.ByteBuffer +import java.util.UUID import scala.jdk.CollectionConverters._ object OrderRandomization { @@ -38,7 +39,8 @@ object OrderRandomization { randomDouble, random[Status], random[MailAddress], - random[Item](2).asJava + random[Item](2).asJava, + random[UUID] ) } diff --git a/modules/examples/provider/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarProducerTest.java b/modules/examples/provider/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarProducerTest.java index 81e6c4a..5c1b467 100644 --- a/modules/examples/provider/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarProducerTest.java +++ b/modules/examples/provider/src/test/java/com/github/austek/example/pulsar/avro/PactPulsarProducerTest.java @@ -1,8 +1,6 @@ package com.github.austek.example.pulsar.avro; import au.com.dius.pact.core.model.ContentTypeHint; -import au.com.dius.pact.core.model.Interaction; -import au.com.dius.pact.core.model.Pact; import au.com.dius.pact.provider.MessageAndMetadata; import au.com.dius.pact.provider.PactVerifyProvider; import au.com.dius.pact.provider.junit5.MessageTestTarget; @@ -14,6 +12,11 @@ import com.github.austek.example.MailAddress; import com.github.austek.example.Order; import com.github.austek.example.Status; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; @@ -21,60 +24,54 @@ import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; -import java.util.Map; - @Provider("order-provider") @PactBroker(url = "http://localhost:9292") class PactPulsarProducerTest { - private static final String AVRO_CONTENT_TYPE = "avro/binary; record=Order"; - private static final String KEY_CONTENT_TYPE = "contentType"; - private static final String KEY_CONTENT_TYPE_HINT = "contentTypeHint"; - private static final ContentTypeHint CONTENT_TYPE_HINT = ContentTypeHint.BINARY; + private static final String AVRO_CONTENT_TYPE = "avro/binary; record=Order"; + private static final String KEY_CONTENT_TYPE = "contentType"; + private static final String KEY_CONTENT_TYPE_HINT = "contentTypeHint"; + private static final ContentTypeHint CONTENT_TYPE_HINT = ContentTypeHint.BINARY; - @TestTemplate - @ExtendWith(PactVerificationInvocationContextProvider.class) - void testTemplate(Pact pact, Interaction interaction, PactVerificationContext context) { - context.verifyInteraction(); - } + @TestTemplate + @ExtendWith(PactVerificationInvocationContextProvider.class) + void testTemplate(PactVerificationContext context) { + context.verifyInteraction(); + } - @BeforeEach - void setupTest(PactVerificationContext context) { - context.setTarget(new MessageTestTarget()); - } + @SuppressWarnings("JUnitMalformedDeclaration") + @BeforeEach + void setupTest(PactVerificationContext context) { + context.setTarget(new MessageTestTarget()); + } - @PactVerifyProvider("Order Created") - public MessageAndMetadata orderCreatedEvent() throws IOException { - Order order = new Order( - 100L, - "name-1", - true, - 15.8F, - 1.8D, Status.CREATED, new MailAddress(121, "street name", null), - List.of( - new Item("Item-1", 1L), - new Item("Item-2", 2L) - ) - ); + @PactVerifyProvider("Order Created") + public MessageAndMetadata orderCreatedEvent() throws IOException { + Order order = + new Order( + 100L, + "name-1", + true, + 15.8F, + 1.8D, + Status.CREATED, + new MailAddress(121, "street name", null), + List.of(new Item("Item-1", 1L), new Item("Item-2", 2L)), + UUID.fromString("20bef962-8cbd-4b8c-8337-97ae385ac45d")); - return new MessageAndMetadata( - serialise(order), - Map.of( - KEY_CONTENT_TYPE, AVRO_CONTENT_TYPE, - KEY_CONTENT_TYPE_HINT, CONTENT_TYPE_HINT - ) - ); - } + return new MessageAndMetadata( + serialise(order), + Map.of( + KEY_CONTENT_TYPE, AVRO_CONTENT_TYPE, + KEY_CONTENT_TYPE_HINT, CONTENT_TYPE_HINT)); + } - private byte[] serialise(Order record) throws IOException { - SpecificDatumWriter writer = new SpecificDatumWriter<>(Order.class); - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); - writer.write(record, encoder); - encoder.flush(); - return outputStream.toByteArray(); - } + private byte[] serialise(Order record) throws IOException { + SpecificDatumWriter writer = new SpecificDatumWriter<>(Order.class); + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + writer.write(record, encoder); + encoder.flush(); + return outputStream.toByteArray(); } + } } diff --git a/modules/plugin/src/main/resources/logback.xml b/modules/plugin/src/main/resources/logback.xml index 98c7954..26130cf 100644 --- a/modules/plugin/src/main/resources/logback.xml +++ b/modules/plugin/src/main/resources/logback.xml @@ -10,7 +10,7 @@ logs/plugin.log - logs/plugin.%d{yyyy-MM-dd}.%i.html + logs/plugin.%d{yyyy-MM-dd}.%i.log 50MB diff --git a/modules/plugin/src/main/scala/com/github/austek/plugin/avro/AvroRecord.scala b/modules/plugin/src/main/scala/com/github/austek/plugin/avro/AvroRecord.scala index a98d5c1..195c260 100644 --- a/modules/plugin/src/main/scala/com/github/austek/plugin/avro/AvroRecord.scala +++ b/modules/plugin/src/main/scala/com/github/austek/plugin/avro/AvroRecord.scala @@ -17,6 +17,7 @@ import org.apache.avro.io.EncoderFactory import java.io.ByteArrayOutputStream import java.nio.charset.StandardCharsets import java.util +import scala.annotation.tailrec import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try, Using} @@ -118,11 +119,12 @@ object Avro { case LONG => Try(AvroLong(path, fieldName, fieldValue.asInstanceOf[Long], rules)).toEither case NULL => Right(AvroNull(path, fieldName)) case STRING => Try(AvroString(path, fieldName, fieldValue.asInstanceOf[String], rules)).toEither - case ARRAY => Left(new UnsupportedOperationException(s"'ARRAY' not support as AvroValue: '$fieldValue'")) - case MAP => Left(new UnsupportedOperationException(s"'MAP' not support as AvroValue: '$fieldValue'")) - case RECORD => Left(new UnsupportedOperationException(s"'RECORD' not support as AvroValue: '$fieldValue'")) - case UNION => Left(new UnsupportedOperationException(s"'UNION' not support as AvroValue: '$fieldValue'")) - case t => Left(new UnsupportedOperationException(s"Unknown type '$t' not support as AvroValue: '$fieldValue'")) + case ARRAY => Left(new UnsupportedOperationException(s"'ARRAY' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case MAP => Left(new UnsupportedOperationException(s"'MAP' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case RECORD => Left(new UnsupportedOperationException(s"'RECORD' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case UNION => Left(new UnsupportedOperationException(s"'UNION' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case t => + Left(new UnsupportedOperationException(s"Unknown type '$t' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) }).left.map(e => PluginErrorException(e)) } } @@ -145,11 +147,11 @@ object Avro { case LONG => Try(fieldValue.toLong).map(v => AvroLong(path, fieldName, v, rules)).toEither case NULL => Right(AvroNull(path, fieldName)) case STRING => Right(AvroString(path, fieldName, fieldValue, rules)) - case ARRAY => Left(new UnsupportedOperationException(s"'ARRAY' not support as AvroValue: '$fieldValue'")) - case MAP => Left(new UnsupportedOperationException(s"'MAP' not support as AvroValue: '$fieldValue'")) - case RECORD => Left(new UnsupportedOperationException(s"'RECORD' not support as AvroValue: '$fieldValue'")) - case UNION => Left(new UnsupportedOperationException(s"'UNION' not support as AvroValue: '$fieldValue'")) - case t => Left(new UnsupportedOperationException(s"Unknown type '$t' not support as AvroValue: '$fieldValue'")) + case ARRAY => Left(new UnsupportedOperationException(s"'ARRAY' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case MAP => Left(new UnsupportedOperationException(s"'MAP' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case RECORD => Left(new UnsupportedOperationException(s"'RECORD' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case UNION => Left(new UnsupportedOperationException(s"'UNION' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) + case t => Left(new UnsupportedOperationException(s"Unknown type '$t' type is not supported for field: '${fieldName.value}' with value: '$fieldValue'")) }).left.map(e => PluginErrorException(e)) } } @@ -212,8 +214,7 @@ object Avro { } object AvroArray { - def apply(rootPath: PactFieldPath, schemaField: Schema.Field, inValue: Value): Either[Seq[PluginError[_]], AvroArray] = { - val fieldName = AvroFieldName(schemaField.name()) + def apply(rootPath: PactFieldPath, fieldName: AvroFieldName, schema: Schema, inValue: Value): Either[Seq[PluginError[_]], AvroArray] = { inValue.kind match { case Empty => Right(AvroArray(rootPath, fieldName)) case NullValue(_) => Right(AvroArray(rootPath, fieldName)) @@ -221,10 +222,10 @@ object Avro { val arrayBasePath = rootPath :+ fieldName listValue.values.zipWithIndex .map { case (singleValue, index) => - schemaField.schema().getElementType.getType match { - case RECORD => AvroValue(arrayBasePath :+ index, fieldName, schemaField.schema(), singleValue, appendPath = false) + schema.getElementType.getType match { + case RECORD => AvroValue(arrayBasePath :+ index, fieldName, schema, singleValue, appendPath = false) case _ => - AvroValue(rootPath, fieldName, schemaField.schema(), singleValue).map { + AvroValue(rootPath, fieldName, schema, singleValue).map { case v: AvroNull => v.copy(path = v.path :+ index) case v: AvroString => v.copy(path = v.path :+ index) case v: AvroEnum => v.copy(path = v.path :+ index) @@ -272,21 +273,21 @@ object Avro { } object AvroMap { - def apply(rootPath: PactFieldPath, schemaField: Schema.Field, inValue: Value): Either[Seq[PluginError[_]], AvroMap] = { - val fieldName = AvroFieldName(schemaField.name()) + def apply(rootPath: PactFieldPath, fieldName: AvroFieldName, schema: Schema, inValue: Value): Either[Seq[PluginError[_]], AvroMap] = { + val path = rootPath :+ fieldName inValue.kind match { - case Empty => Right(AvroMap(rootPath, fieldName)) - case NullValue(_) => Right(AvroMap(rootPath, fieldName)) + case Empty => Right(AvroMap(path, fieldName)) + case NullValue(_) => Right(AvroMap(path, fieldName)) case StructValue(structValue) => structValue.fields .map { case (key, singleValue) => - AvroValue(rootPath, key.toFieldName, schemaField.schema(), singleValue).map { v => + AvroValue(path, key.toFieldName, schema, singleValue).map { v => key.toPactPath -> v } } .partitionMap(identity) match { case (errors, _) if errors.nonEmpty => Left(errors.toSeq.flatten) - case (_, fields) => Right(AvroMap(rootPath, fieldName, fields.toMap)) + case (_, fields) => Right(AvroMap(path, fieldName, fields.toMap)) } case _ => Left(Seq(PluginErrorMessage(s"Expected map value for field '${fieldName.value}' but got '${inValue.kind}'"))) } @@ -328,38 +329,7 @@ object Avro { if (null != value) { val field = schema.getField(key) val fieldSchema = field.schema() - fieldSchema.getType match { - case ENUM => - record.put(key, new GenericData.EnumSymbol(fieldSchema, value)) - case ARRAY if fieldSchema.getElementType.getType == RECORD => - record.put( - key, - value - .asInstanceOf[java.util.List[AvroRecord]] - .asScala - .map { item => - item.toGenericRecord(fieldSchema.getElementType) - } - .asJava - ) - case MAP if fieldSchema.getValueType.getType == RECORD => - record.put( - key, - value - .asInstanceOf[util.Map[String, AvroRecord]] - .asScala - .map { case (key, item) => - key -> item.toGenericRecord(fieldSchema.getValueType) - } - .asJava - ) - case RECORD => - record.put(key, value.asInstanceOf[AvroRecord].toGenericRecord(fieldSchema)) - case FIXED => - record.put(key, new GenericData.Fixed(fieldSchema, value.asInstanceOf[String].getBytes)) - case _ => - record.put(key, value) - } + fieldToRecord(record, key, value, fieldSchema) } else { record.put(key, value) } @@ -367,6 +337,44 @@ object Avro { record } + @tailrec + private def fieldToRecord(record: GenericRecord, key: String, value: Any, fieldSchema: Schema): Unit = { + fieldSchema.getType match { + case ENUM => + record.put(key, new GenericData.EnumSymbol(fieldSchema, value)) + case ARRAY if fieldSchema.getElementType.getType == RECORD => + record.put( + key, + value + .asInstanceOf[util.List[AvroRecord]] + .asScala + .map { item => + item.toGenericRecord(fieldSchema.getElementType) + } + .asJava + ) + case MAP if fieldSchema.getValueType.getType == RECORD => + record.put( + key, + value + .asInstanceOf[util.Map[String, AvroRecord]] + .asScala + .map { case (key, item) => + key -> item.toGenericRecord(fieldSchema.getValueType) + } + .asJava + ) + case RECORD => + record.put(key, value.asInstanceOf[AvroRecord].toGenericRecord(fieldSchema)) + case FIXED => + record.put(key, new GenericData.Fixed(fieldSchema, value.asInstanceOf[String].getBytes)) + case UNION => + fieldToRecord(record, key, value, fieldSchema.getTypes.asScala.filterNot(_.getType == NULL).head) + case _ => + record.put(key, value) + } + } + def toByteString(schema: Schema): Either[PluginErrorException, ByteString] = { val datumWriter = new GenericDatumWriter[GenericRecord](schema) Using(new ByteArrayOutputStream()) { os => @@ -396,23 +404,7 @@ object Avro { val fieldName = AvroFieldName(schemaField.name()) configFields.get(schemaField.name()) match { case Some(configValue) => - schemaField.schema().getType match { - case STRING => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case INT => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case LONG => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case FLOAT => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case DOUBLE => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case BOOLEAN => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case ENUM => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case FIXED => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case BYTES => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case NULL => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) - case RECORD => AvroRecord(rootPath :+ schemaField.name(), fieldName, schemaField.schema(), configValue.getStructValue.fields) - case ARRAY => AvroArray(rootPath, schemaField, configValue) - case MAP => AvroMap(rootPath :+ schemaField.name(), schemaField, configValue) - case UNION => Left(Seq(PluginErrorException(new UnsupportedOperationException("'UNION' not support as AvroValue")))) - case t => Left(Seq(PluginErrorException(new UnsupportedOperationException(s"Unknown type '$t' not support as AvroValue")))) - } + selectField(rootPath, schemaField, fieldName, configValue) case None => if (schemaField.hasDefaultValue) { AvroValue(rootPath :+ fieldName, fieldName, schemaField.schema().getType, schemaField.defaultVal(), Seq.empty).left.map(e => Seq(e)) @@ -429,5 +421,87 @@ object Avro { } } + private def selectField( + rootPath: PactFieldPath, + schemaField: Schema.Field, + fieldName: AvroFieldName, + configValue: Value + ): Either[Seq[PluginError[_]], AvroValue] = { + schemaField.schema().getType match { + case STRING => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case INT => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case LONG => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case FLOAT => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case DOUBLE => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case BOOLEAN => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case ENUM => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case FIXED => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case BYTES => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case NULL => AvroValue(rootPath, fieldName, schemaField.schema(), configValue) + case RECORD => AvroRecord(rootPath :+ fieldName, fieldName, schemaField.schema(), configValue.getStructValue.fields) + case ARRAY => AvroArray(rootPath, fieldName, schemaField.schema(), configValue) + case MAP => AvroMap(rootPath, fieldName, schemaField.schema(), configValue) + case UNION => + val subTypes = schemaField.schema().getTypes.asScala + if (subTypes.size == 2 && subTypes.exists(_.getType == NULL)) { + subTypes.filterNot(_.getType == NULL).headOption match { + case Some(schema) => selectNullableField(rootPath, fieldName, configValue, schema) + case None => + Left( + Seq( + PluginErrorException( + new UnsupportedOperationException(s"A valid schema wasn't find for field: '${fieldName.value}' with value: '$configValue'") + ) + ) + ) + } + } else { + Left( + Seq( + PluginErrorException( + new UnsupportedOperationException( + s"'UNION' type is only supported to make field nullable, field: '${fieldName.value}' with value: '$configValue'" + ) + ) + ) + ) + } + case t => + Left( + Seq( + PluginErrorException( + new UnsupportedOperationException(s"Unknown type '$t' type is not supported for field: '${fieldName.value}' with value: '$configValue'") + ) + ) + ) + } + } + + private def selectNullableField(rootPath: PactFieldPath, fieldName: AvroFieldName, configValue: Value, schema: Schema) = { + schema.getType match { + case STRING => AvroValue(rootPath, fieldName, schema, configValue) + case INT => AvroValue(rootPath, fieldName, schema, configValue) + case LONG => AvroValue(rootPath, fieldName, schema, configValue) + case FLOAT => AvroValue(rootPath, fieldName, schema, configValue) + case DOUBLE => AvroValue(rootPath, fieldName, schema, configValue) + case BOOLEAN => AvroValue(rootPath, fieldName, schema, configValue) + case ENUM => AvroValue(rootPath, fieldName, schema, configValue) + case FIXED => AvroValue(rootPath, fieldName, schema, configValue) + case BYTES => AvroValue(rootPath, fieldName, schema, configValue) + case NULL => AvroValue(rootPath, fieldName, schema, configValue) + case RECORD => AvroRecord(rootPath :+ fieldName, fieldName, schema, configValue.getStructValue.fields) + case ARRAY => + AvroArray(rootPath, fieldName, schema, configValue) + case MAP => AvroMap(rootPath, fieldName, schema, configValue) + case t => + Left( + Seq( + PluginErrorException( + new UnsupportedOperationException(s"$t is not a supported type for field: '${fieldName.value}' with value: '$configValue'") + ) + ) + ) + } + } } } diff --git a/modules/plugin/src/main/scala/com/github/austek/plugin/avro/PactAvroPluginServer.scala b/modules/plugin/src/main/scala/com/github/austek/plugin/avro/PactAvroPluginServer.scala index 3f06bfd..7a64159 100644 --- a/modules/plugin/src/main/scala/com/github/austek/plugin/avro/PactAvroPluginServer.scala +++ b/modules/plugin/src/main/scala/com/github/austek/plugin/avro/PactAvroPluginServer.scala @@ -6,7 +6,6 @@ import io.pact.plugin.pact_plugin.PactPluginGrpc import java.util.UUID import java.util.UUID.randomUUID import scala.concurrent.ExecutionContext -import scala.sys.ShutdownHookThread object PactAvroPluginServer { def main(args: Array[String]): Unit = { @@ -24,7 +23,7 @@ class PactAvroPluginServer( private[this] var server: Option[Server] = None - private def start(): ShutdownHookThread = { + private def start(): Unit = { server = Option( ServerBuilder .forPort(port) @@ -33,7 +32,7 @@ class PactAvroPluginServer( .start ) println(s"""{\"port":$port, "serverKey":"$serverKey"}""") - sys.addShutdownHook { + val _ = sys.addShutdownHook { System.err.println("*** shutting down gRPC server since JVM is shutting down") self.stop() System.err.println("*** server shut down") diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 0f1c21a..561c1db 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -16,7 +16,11 @@ object BuildSettings { scalaVersion := scala213, resolvers += Resolver.mavenLocal, resolvers ++= Resolver.sonatypeOssRepos("releases"), - javacOptions ++= Seq("-encoding", "UTF-8"), + javacOptions ++= Seq( + "-encoding", "UTF-8", + "-source", s"$javaVersion", + "-target", s"$javaVersion" + ), Test / fork := true, scalacOptions --= { if (sys.env.contains("CI")) @@ -24,6 +28,12 @@ object BuildSettings { else Seq("-Xfatal-warnings") }, + scalacOptions ++= Seq( + "-Wconf:src=src_managed/.*:silent" + ), + Test / scalacOptions := Seq( + "-Ywarn-value-discard" + ), initialize := { val _ = initialize.value val javaVersionFound = sys.props("java.specification.version").toDouble diff --git a/project/Dependencies.scala b/project/Dependencies.scala index abb09c4..073fb2a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,9 +5,8 @@ object Dependencies extends DependencyUtils { object Versions { val avro = "1.11.1" - val grpc = "1.53.0" val logback = "1.2.11" - val pact = "4.5.4" + val pact = "4.5.5" val pulsar4sVersion = "2.9.0" val scalaTest = "3.2.15" } @@ -44,7 +43,7 @@ object Dependencies extends DependencyUtils { val scalaTest: ModuleID = "org.scalatest" %% "scalatest" % Versions.scalaTest // Overrides - val grpcApi: ModuleID = "io.grpc" % "grpc-api" % Versions.grpc - val grpcCore: ModuleID = "io.grpc" % "grpc-core" % Versions.grpc - val grpcNetty: ModuleID = "io.grpc" % "grpc-netty" % Versions.grpc + val grpcApi: ModuleID = "io.grpc" % "grpc-api" % scalapb.compiler.Version.grpcJavaVersion + val grpcCore: ModuleID = "io.grpc" % "grpc-core" % scalapb.compiler.Version.grpcJavaVersion + val grpcNetty: ModuleID = "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion } diff --git a/project/plugins.sbt b/project/plugins.sbt index 270f59d..65d0060 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,7 @@ addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.14.2") addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1") -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.13") -addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.3.1") +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16") +addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2") addSbtPlugin("net.aichler" % "sbt-jupiter-interface" % "0.11.1") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0") diff --git a/project/sbt-ghpages.sbt b/project/sbt-ghpages.sbt index 1fd40c3..7ac7797 100644 --- a/project/sbt-ghpages.sbt +++ b/project/sbt-ghpages.sbt @@ -1 +1 @@ -addSbtPlugin("io.kevinlee" % "sbt-github-pages" % "0.11.0") +addSbtPlugin("io.kevinlee" % "sbt-github-pages" % "0.12.0") diff --git a/project/sbt-grpc.sbt b/project/sbt-grpc.sbt index 8357be7..8363432 100644 --- a/project/sbt-grpc.sbt +++ b/project/sbt-grpc.sbt @@ -1,3 +1,3 @@ -addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.3") +addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.11"