Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error writing messages: broker appears to be expecting TLS #265

Open
dnaby opened this issue Dec 26, 2023 · 4 comments
Open

Error writing messages: broker appears to be expecting TLS #265

dnaby opened this issue Dec 26, 2023 · 4 comments

Comments

@dnaby
Copy link

dnaby commented Dec 26, 2023

import { check } from "k6";
import { Writer, Reader, SchemaRegistry, SCHEMA_TYPE_JSON, TLS_1_3 } from "k6/x/kafka";

const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: false,
  minVersion: TLS_1_3,

  clientCertPem: "/Users/mac/go/bin/script_kafka/super-user/user.crt",
  clientKeyPem: "/Users/mac/go/bin/script_kafka/super-user/user.key",
  serverCaPem: "/Users/mac/go/bin/script_kafka/super-user/ca.crt"
};

const writer = new Writer({
  brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
  topic: "xk6-kafka-test-topic",
  tlsConfig: tlsConfig,
});

const reader = new Reader({
  brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
  topic: "xk6-kafka-test-topic",
  tlsConfig: tlsConfig,
});

const schemaRegistry = new SchemaRegistry();

export const options = {
  thresholds: {
    kafka_writer_error_count: ["count == 0"],
    kafka_reader_error_count: ["count == 0"],
  },
};

export default function () {
  for (let index = 0; index < 100; index++) {
    let messages = [
      {
        // The data type of the key is JSON
        key: schemaRegistry.serialize({
          data: {
            correlationId: "test-id-abc-" + index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        // The data type of the value is JSON
        value: schemaRegistry.serialize({
          data: {
            name: "xk6-kafka",
            version: "0.9.0",
            author: "Mouhamadou Naby DIA",
            description:
              "k6 extension to load test Apache Kafka",
            index: index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        headers: {
          key: "value",
        },
        offset: index,
        partition: 0,
        time: new Date(), // Will be converted to timestamp automatically
      },
      {
        key: schemaRegistry.serialize({
          data: {
            correlationId: "test-id-def-" + index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        value: schemaRegistry.serialize({
          data: {
            name: "xk6-kafka",
            version: "0.9.0",
            author: "Mouhamadou Naby DIA",
            description:
              "k6 extension to load test Apache Kafka",
            index: index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        headers: {
          key: "value",
        },
      },
    ];

    writer.produce({ messages: messages });
  }

  // Read 10 messages only
  let messages = reader.consume({ limit: 10 });

  check(messages, {
    "10 messages are received": (messages) => messages.length == 10,
  });

  check(messages[0], {
    "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == "xk6-kafka-test-topic",
    "Key contains key/value and is JSON": (msg) =>
      schemaRegistry
        .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
        .correlationId.startsWith("test-id-"),
    "Value contains key/value and is JSON": (msg) =>
      typeof schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }) == "object" &&
      schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }).name == "xk6-kafka",
    "Header equals {'key': 'value'}": (msg) =>
      "key" in msg.headers &&
      String.fromCharCode(...msg.headers["key"]) == "value",
    "Time is past": (msg) => new Date(msg["time"]) < new Date(),
    "Partition is zero": (msg) => msg["partition"] == 0,
    "Offset is gte zero": (msg) => msg["offset"] >= 0,
    "High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
  });
}

export function teardown(data) {
  writer.close();
  reader.close();
}

When trying to run the below xk6-kafka script, I got the following error:

❯ ./k6 run script_kafka/topic.js                                                                                                                                                           ─╯

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script_kafka/topic.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS  error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at file:///Users/mac/go/bin/script_kafka/topic.js:89:19(104)  executor=per-vu-iterations scenario=default source=stacktrace

     █ teardown

     data_received......................: 0 B   0 B/s
     data_sent..........................: 0 B   0 B/s
     iteration_duration.................: avg=1.5s min=3.58µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
     iterations.........................: 1     0.331717/s
   ✓ kafka_reader_error_count...........: 0     0/s
     kafka_writer_acks_required.........: 0     min=0      max=0 
     kafka_writer_async.................: 0.00% ✓ 0        ✗ 1   
     kafka_writer_attempts_max..........: 0     min=0      max=0 
     kafka_writer_batch_bytes...........: 0 B   0 B/s
     kafka_writer_batch_max.............: 1     min=1      max=1 
     kafka_writer_batch_queue_seconds...: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_size............: 0     0/s
     kafka_writer_batch_timeout.........: 0s    min=0s     max=0s
   ✓ kafka_writer_error_count...........: 0     0/s
     kafka_writer_message_bytes.........: 0 B   0 B/s
     kafka_writer_message_count.........: 0     0/s
     kafka_writer_read_timeout..........: 0s    min=0s     max=0s
     kafka_writer_retries_count.........: 0     0/s
     kafka_writer_wait_seconds..........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_count...........: 0     0/s
     kafka_writer_write_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_timeout.........: 0s    min=0s     max=0s
     vus................................: 1     min=1      max=1 
     vus_max............................: 1     min=1      max=1 


running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m03.0s/10m0s  1/1 iters, 1 per VU

I setup my Kafka cluster on a kubernetes cluster using strimzi operator and I have enabled mTLS(mutual TLS).

@mostafa
Copy link
Owner

mostafa commented Dec 27, 2023

Hey @dnaby,

Are your certificates self-signed? Have you tried setting insecureSkipTlsVerify to true to see if it works? Or even downgrading the TLS version?

Update:
I released a new version (v0.22.0) just now. Try that as well to see if it is fixed.

@dnaby
Copy link
Author

dnaby commented Dec 27, 2023

Hey @mostafa,

I am using certificates coming from strimzi. They offer an automated certificate management system.

I have tried all these but it's not working. Also the cluster on which I installed kafka is on premise. And lately there has been added a security that block remote ssh connection to the nodes(if the access has not been granted). So I am waiting to get the authorization in order to see if the problem is not coming from there.

@mostafa
Copy link
Owner

mostafa commented Dec 27, 2023

@dnaby Can you connect the logger to both instances of Writer (in WriterConfig.connectLogger) and Reader (in ReaderConfig.connectLogger) to see what's happening behind the scenes?

@dnaby
Copy link
Author

dnaby commented Dec 27, 2023

❯ ./k6 run script_kafka/kafka.js                                                                                                                                                           ─╯

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script_kafka/kafka.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] {"This":{"addr":[{},{},{}],"topic":"xk6_kafka_json_topic","balancer":{},"max_attempts":0,"write_backoff_min":0,"write_backoff_max":0,"batch_size":1,"batch_bytes":0,"batch_timeout":0,"read_timeout":0,"write_timeout":0,"required_acks":0,"async":false,"compression":0,"logger":{"out":{},"hooks":{},"formatter":{"force_colors":false,"disable_colors":false,"force_quote":false,"disable_quote":false,"environment_override_colors":false,"disable_timestamp":false,"full_timestamp":false,"timestamp_format":"","disable_sorting":false,"disable_level_truncation":false,"pad_level_text":false,"quote_empty_fields":false,"field_map":{}},"report_caller":false,"level":4,"buffer_pool":null},"error_logger":null,"transport":{"dial_timeout":0,"idle_timeout":0,"metadata_ttl":0,"metadata_topics":[],"client_id":"","tls":null,"s_a_s_l":null,"resolver":null,"context":null},"allow_auto_topic_creation":false}}  source=console
INFO[0000] {"This":{}}                                   source=console
ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS  error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at file:///Users/mac/go/bin/script_kafka/kafka.js:92:19(114)  executor=per-vu-iterations scenario=default source=stacktrace

     █ teardown

     data_received......................: 0 B         0 B/s
     data_sent..........................: 0 B         0 B/s
     iteration_duration.................: avg=1.5s min=15.2µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
     iterations.........................: 1           0.33155/s
   ✓ kafka_reader_error_count...........: 0           0/s
     kafka_writer_acks_required.........: 0           min=0           max=0          
     kafka_writer_async.................: 0.00%       ✓ 0             ✗ 1            
     kafka_writer_attempts_max..........: 10          min=10          max=10         
     kafka_writer_batch_bytes...........: 0 B         0 B/s
     kafka_writer_batch_max.............: 1           min=1           max=1          
     kafka_writer_batch_queue_seconds...: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_size............: 0           0/s
     kafka_writer_batch_timeout.........: 277h46m40s  min=277h46m40s  max=277h46m40s 
   ✓ kafka_writer_error_count...........: 0           0/s
     kafka_writer_message_bytes.........: 0 B         0 B/s
     kafka_writer_message_count.........: 0           0/s
     kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
     kafka_writer_retries_count.........: 0           0/s
     kafka_writer_wait_seconds..........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_count...........: 0           0/s
     kafka_writer_write_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
     vus................................: 1           min=1           max=1          
     vus_max............................: 1           min=1           max=1          


running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m03.0s/10m0s  1/1 iters, 1 per VU

Here is the output when I set the connectLogger to true and I log the writer and reader.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants