From f07390e8b1f5a6789065750a494035904e38e014 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Mon, 29 Apr 2024 11:14:37 +0200 Subject: [PATCH] Raise error when the reader cannot consume anymore (#287) * Raise an error to be caught by JS when the reader cannot consume anymore due to errors * Add try/catch to test_json.js for demo * Fix tests to reflect changes * Bump golang.org/x/net from 0.22.0 to 0.23.0 Bumps [golang.org/x/net](https://github.com/golang/net) from 0.22.0 to 0.23.0. - [Commits](https://github.com/golang/net/compare/v0.22.0...v0.23.0) --- updated-dependencies: - dependency-name: golang.org/x/net dependency-type: indirect ... Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .golanci.yaml | 2 +- go.mod | 2 +- go.sum | 4 +-- reader.go | 4 +-- reader_test.go | 4 +-- scripts/test_json.js | 60 +++++++++++++++++++++++--------------------- 6 files changed, 40 insertions(+), 36 deletions(-) diff --git a/.golanci.yaml b/.golanci.yaml index 53a4418..dcbbb6a 100644 --- a/.golanci.yaml +++ b/.golanci.yaml @@ -15,4 +15,4 @@ linters: - exhaustruct - gocognit - gochecknoinits - - gocyclo \ No newline at end of file + - gocyclo diff --git a/go.mod b/go.mod index c2bef79..d97d9ce 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect golang.org/x/crypto v0.21.0 // indirect - golang.org/x/net v0.22.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index cfb4351..070a66e 100644 --- a/go.sum +++ b/go.sum @@ -230,8 +230,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/reader.go b/reader.go index f339569..09ee5d0 100644 --- a/reader.go +++ b/reader.go @@ -336,7 +336,7 @@ func (k *Kafka) consume( err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil) logger.WithField("error", err).Info(err) - return messages + common.Throw(k.vu.Runtime(), err) } if err != nil { @@ -344,7 +344,7 @@ func (k *Kafka) consume( err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil) logger.WithField("error", err).Error(err) - return messages + common.Throw(k.vu.Runtime(), err) } var messageTime string diff --git a/reader_test.go b/reader_test.go index eac05e6..655d8ea 100644 --- a/reader_test.go +++ b/reader_test.go @@ -33,7 +33,7 @@ func TestConsumerMaxWaitExceeded(t *testing.T) { require.NoError(t, test.moveToVUCode()) // Consume a message in the VU function. - assert.NotPanics(t, func() { + assert.Panics(t, func() { messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) assert.Empty(t, messages) }) @@ -231,7 +231,7 @@ func TestConsumerContextCancelled(t *testing.T) { test.cancelContext() // Consume a message in the VU function. - assert.NotPanics(t, func() { + assert.Panics(t, func() { messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) assert.Empty(t, messages) }) diff --git a/scripts/test_json.js b/scripts/test_json.js index b038bf3..01ec9c1 100644 --- a/scripts/test_json.js +++ b/scripts/test_json.js @@ -114,36 +114,40 @@ export default function () { writer.produce({ messages: messages }); } - // Read 10 messages only - let messages = reader.consume({ limit: 10 }); + try { + // Read 10 messages only + let messages = reader.consume({ limit: 10 }); - check(messages, { - "10 messages are received": (messages) => messages.length == 10, - }); + check(messages, { + "10 messages are received": (messages) => messages.length == 10, + }); - check(messages[0], { - "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic, - "Key contains key/value and is JSON": (msg) => - schemaRegistry - .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON }) - .correlationId.startsWith("test-id-"), - "Value contains key/value and is JSON": (msg) => - typeof schemaRegistry.deserialize({ - data: msg.value, - schemaType: SCHEMA_TYPE_JSON, - }) == "object" && - schemaRegistry.deserialize({ - data: msg.value, - schemaType: SCHEMA_TYPE_JSON, - }).name == "xk6-kafka", - "Header equals {'mykey': 'myvalue'}": (msg) => - "mykey" in msg.headers && - String.fromCharCode(...msg.headers["mykey"]) == "myvalue", - "Time is past": (msg) => new Date(msg["time"]) < new Date(), - "Partition is zero": (msg) => msg["partition"] == 0, - "Offset is gte zero": (msg) => msg["offset"] >= 0, - "High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0, - }); + check(messages[0], { + "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic, + "Key contains key/value and is JSON": (msg) => + schemaRegistry + .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON }) + .correlationId.startsWith("test-id-"), + "Value contains key/value and is JSON": (msg) => + typeof schemaRegistry.deserialize({ + data: msg.value, + schemaType: SCHEMA_TYPE_JSON, + }) == "object" && + schemaRegistry.deserialize({ + data: msg.value, + schemaType: SCHEMA_TYPE_JSON, + }).name == "xk6-kafka", + "Header equals {'mykey': 'myvalue'}": (msg) => + "mykey" in msg.headers && + String.fromCharCode(...msg.headers["mykey"]) == "myvalue", + "Time is past": (msg) => new Date(msg["time"]) < new Date(), + "Partition is zero": (msg) => msg["partition"] == 0, + "Offset is gte zero": (msg) => msg["offset"] >= 0, + "High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0, + }); + } catch (error) { + console.error(error); + } } export function teardown(data) {