Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base64 decoding the received records #2

Open
sashman opened this issue Mar 5, 2021 · 2 comments
Open

Base64 decoding the received records #2

sashman opened this issue Mar 5, 2021 · 2 comments

Comments

@sashman
Copy link
Contributor

sashman commented Mar 5, 2021

Hey, I've noticed that the incoming messages are base64 encoded, which is still usable 👍 , however, the kinesis adapter defines a parser https://github.com/ex-aws/ex_aws_kinesis/blob/v2.0.1/lib/ex_aws/kinesis.ex#L68 which I assumed should be invoked somewhere?

I'm not 100% familiar with ex_aws conventions, and I could not find where ex_aws uses the parser, but given that it is available at the stage here https://github.com/uberbrodt/kcl_ex/blob/master/lib/kinesis_client/kinesis.ex#L66, in the ExAws.Operation.JSON struct, should the messages be decoded there too?

@uberbrodt
Copy link
Owner

Are you sure it's not decoding? They should be decoded when ExAWS makes the request?

@sashman
Copy link
Contributor Author

sashman commented Mar 10, 2021

Screen.Recording.2021-03-10.at.16.29.14.mov

My consumer hanlder looks like this:

  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
  end

  defp process_data(raw_data) do
    IO.inspect(raw_data |> inspect(), label: "Raw:")

    IO.inspect(raw_data |> Base.decode64!() |> inspect(), label: "base64 decoded:")
  end

and my config

    opts = [
      stream_name: stream_name,
      app_name: app_name,
      shard_consumer: KinesisConsumer.Consumer,
      processors: [
        default: [
          concurrency: 1,
          min_demand: 100,
          max_demand: 1000
        ]
      ],
      batchers: [
        default: [
          batch_timeout: 5_000
        ]
      ]
    ]

    children = [
      {KinesisClient.Stream, opts}
    ]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants