Redix.Stream
is an extension to redix supporting Redis streams. This project allows you to stream and consume data from redis streams.
Redis streams are similar to Kafka, nats.io and other "distributed commit log" software. The core idea is that the stream is an append-only log and any number of consumers can read from that stream, each keeping track of its position in that log. This allows for high-troughput processing of messages in the log. Streams can be used for analytics, queues, etc. based on how they are consumed.
** Note: redis streams are currently in the 5.0 release candidate. See Installation
below for details. **
If available in Hex, the package can be installed
by adding redix_stream
to your list of dependencies in mix.exs
:
def deps do
[
{:redix_stream, "~> 0.1.3"}
]
end
As of writing, redis streams are currently available in the 5.0 release candidates. You can install from the official downloads page (or directly from the unstable.tar.gz), use the 5.0-rc docker image or install from source.
If you are using Homebrew on macOS, you can simply run run install redis --head
.
First, you will need to start redix
, e.g.
{:ok, redix} = Redix.start_link("redis://localhost:6379")
Redix can also be started in the supervision tree as a named process.
Next, you should start a consumer to a stream specifying a callback function to run for each message:
Redix.Stream.Consumer.start_link(redix, "my_topic", fn stream, msg -> Logger.info("Got message #{inspect msg} from stream #{stream}") end)
The callback function can be in {module, function, args}
format as well:
Redix.Stream.Consumer.start_link(redix, "my_topic", {MyModule, :my_func, []})
Consumers can also be started as part of the Supervision tree:
def MyApp.Application do
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
worker(Redix, [[], [name: :redix]]),
Redix.Stream.consumer(:redix, "my_topic", {MyModule, :my_func, []})
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Blocks.Supervisor]
Supervisor.start_link(children, opts)
end
From there, you will be able to effectively stream messages.
Redis Streams have the concept of consumer groups. Consumer groups allow multiple consumers to work on the same stream, guaranteeing that messages are only processed by one consumer.
Starting a Consumer as part of a group is similar to starting a normal stream. You need to provide the additional group_name
and consumer_name
options:
def MyApp.Application do
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
worker(Redix, [[], [name: :redix]]),
Redix.Stream.consumer(:redix, "my_topic", {MyModule, :my_func, [group_name: "my_group", consumer_name: "consumer1"]})
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Blocks.Supervisor]
Supervisor.start_link(children, opts)
end
To contribute, please feel free to open an issue or pull request. Here are a few topics which we know need to be addressed:
- Callbacks are run in the stream consumer process. If the callback fails, it will crash the consumer process. The callbacks also block all processing until each finishes.