A Clojure library that enables applications to consume messages from Amazon's Simple Queue Service (SQS) with minimal ceremony while achieving superior resilience and performance properties. Leverages core.async to construct an efficient processing machine behind the scenes and provides an easy high level API.
Uses cognitect's AWS client for a minimal library footprint and true non-blocking io via an asynchronous jetty client.
Uses core.async for the internal machinery, but as a consumer you should be free to perform blocking io if you need to and you are. Blocking consumers are the default but you can opt into non-blocking consumers.
If your consumer is still working on a message as it nears its visibility timeout, piped will extend the visibility timeout for you instead of risking that another worker will start processing the same message.
Piped implements backpressure between the processing of a message and the receiving of messages from SQS so that you're never grabbing items off the queue that you aren't ready to process in a timely manner.
Messages are read and acked in batches which decreases your costs and improves throughput. Ack batches are accumulated until the max batch size is met or one of the messages in the batch is near expiry and therefore needs to be acked promptly rather than waiting for the batch to finish filling up.
New messages stop being requested, in-flight messages are given a chance to finish, and final acks and nacks are sent. You must allow time after the SIGTERM for this process to happen before issuing a SIGKILL. You can see an actual shutdown sequence here.
Most users should only need to provide two things: the queue-url
and the consumer-parallelism
for message processing. Piped
automatically chooses the number of sqs polling processes needed to saturate your consumers.
(require '[piped.core :as piped])
(def opts
{; *required*
; the full url to the sqs queue
:queue-url "http://localhost:4576/queue/piped-test-queue17184"
; *required*
; the function that gets called to process each message
; it may throw or return :nack in order to redrive the message
; (or DLQ depending on redrive policy). otherwise you may return
; :ack (or anything else) to result in an ack of the message
; you may also return a channel that eventually delivers either
; :ack or :nack (intended for non-blocking mode)
:consumer-fn (fn [msg] (println msg))
; *optional* - defaults to 10
; maximum number of messages that should be processed concurrently
; you should tune this according to your available resources and
; desired throughput.
:consumer-parallelism 10
; *optional* - defaults to identity
; a pure function to preprocess the message before it arrives at
; your consumer-fn. intended for parsing the message body into data
:transform-fn identity
; *optional* - defaults to {}
; configuration passed to the aws-api client in case you need to customize
; things like the credentials provider
:client-opts {}
; *optional* - defaults to true
; whether to create dedicated threads for processing each message
; or if the processing should share the core.async dispatch thread
; pool (only set this to false if you're doing non-blocking)
:blocking-consumers true})
; registers a processor with the above config
(def processor (piped/processor opts))
; start polling and processing messages
(piped/start processor)
; stop the system. blocks until in-flight messages
; are done and nacks any messages that have been
; received but haven't started to be processed
(piped/stop processor)
(require '[clojure.edn :as edn])
(require '[piped.sweet :refer [defmultiprocessor]])
(require '[piped.core :as piped])
; defines a clojure multimethod and a piped processor system attached to the var
; the map of options is the same as seen in the earlier example but you may omit
; the consumer-fn (the multimethod satisfies that).
(defmultiprocessor my-processor [{:keys [Body]}]
{:queue-url "http://localhost:4576/queue/piped-test-queue17184"
:consumer-parallelism 50
:transform-fn #(update % :Body edn/read-string)}
(get Body :kind))
; define normal clojure defmethods to process message variants
; there is already a :default clause defined to log and nack any
; message for which there is no matching defmethod
(defmethod my-processor :alert [message]
(log/error (get-in message [:Body :message])))
(defmethod my-processor :warn [message]
(log/warn (get-in message [:Body :message])))
; start the processor by invoking protocol function on the var
(piped/start #'my-processor)
; stop the processor by invoking protocol function on the var
(piped/stop #'my-processor)
Casual testing easily reaches between 4000 (local) and 5500 (ec2) messages per second of throughput (received, printed, and acked) when using a consumer parallelism above 500. Real-world throughput will depend heavily on what you do with each message but it's unlikely Piped will become your bottleneck.
The results below were measured processing 100,000 messages on a mid-range 2019 Macbook Pro over 70 down / 15 up wi-fi and traversing the public internet from Chicago to Virginia (us-east-1).
A core.async channel that connects producers and consumers.
These long poll SQS for messages and stuff them onto the pipe. If the pipe doesn't accept their messages in a timely manner then they start asking SQS for fewer messages at a time to match the consumption rate.
These read SQS messages from the pipe and hand them to your message processing function. Consumers supervise the message processing in order to extend visibility timeout and eventually route messages to be acked or nacked.
Non-blocking consumers run your processing function on one of the go-block dispatch threads and should only be used for cpu-bound tasks. Blocking consumers run your processing function on a dedicated thread and may be used for blocking io. Blocking consumers are the default mode.
These collect messages into batches and ack them. They'll ack a batch as soon as the batch is full or one of the messages in the pending batch is about to exceed its visibility timeout.
These collect messages into batches and nack them. They'll nack a batch as soon as the batch is full or the batch has been accumulating for longer than 5 seconds.
This is the code that you write. It receives a message and can do whatever it wants with it.
If :ack
or :nack
are returned, the message will be acked or nacked. If an exception is thrown
the message will be nacked. Any other return values will be acked. If you have multiple kinds of
messages in your queue a multimethod is a good choice.
A set of the above abstractions that can be started and stopped.
I made this library because of perceived deficiencies in Squeedo. Squeedo performs blocking io from go blocks when receiving and acking messages which can lead to poor performance. It is my opinion that Squeedo also doesn't provide enough leverage over the raw AWS SDK. YMMV.
- Performing a blocking ReceiveMessage call on a go thread
- Performing blocking acks/nacks on a go thread
If you're unaware of the dangers of mixing blocking-io and go blocks, please read this excellent post.
- Konrad Tallman for teaching me about SQS these last months and helping kick the tires.
This project is licensed under MIT license.