diff --git a/.gitignore b/.gitignore index 17edac4..536e6af 100644 --- a/.gitignore +++ b/.gitignore @@ -6,9 +6,11 @@ pom.xml pom.xml.asc *.jar *.class +/.clj-kondo /.lein-* +/.lsp /.nrepl-port .hgignore .hg/ .idea/ -*.iml \ No newline at end of file +*.iml diff --git a/src/piped/actions.clj b/src/piped/actions.clj index d1d9913..ec01345 100644 --- a/src/piped/actions.clj +++ b/src/piped/actions.clj @@ -23,4 +23,3 @@ (when (utils/anomaly? response) (log/error "Error when trying to nack batch of messages." (pr-str response)))) (recur)))) - diff --git a/src/piped/consumers.clj b/src/piped/consumers.clj index 35cff3d..79a0fc4 100644 --- a/src/piped/consumers.clj +++ b/src/piped/consumers.clj @@ -1,46 +1,62 @@ (ns piped.consumers "Code relating to reading SQS messages from channels and processing them." - (:require [clojure.core.async :as async] - [clojure.tools.logging :as log] - [piped.utils :as utils] - [piped.sqs :as sqs])) - + (:require + [clojure.core.async :as async] + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [piped.sqs :as sqs] + [piped.utils :as utils])) (defn- make-consumer [client input-chan ack-chan nack-chan message-fn] (async/go-loop [msg nil task nil] - (if (and (nil? msg) (nil? task)) (if-some [msg (async/deadline msg)] :extend + :priority true)] + (condp s/valid? result + nil? + (recur nil nil) - (case (async/alt! - [task] ([action] action) - [(utils/message->deadline msg)] :extend - :priority true) - - nil - (recur nil nil) - - :ack - (do (async/>! ack-chan msg) (recur nil nil)) + :piped/action + (case result + :ack + (do + (async/>! ack-chan msg) + (recur nil nil)) + :nack + (do + (async/>! nack-chan msg) + (recur nil nil))) - :nack - (do (async/>! nack-chan msg) (recur nil nil)) - - :extend - (recur - (let [message-id (utils/message->identifier msg) - queue-url (utils/message->queue-url msg) - old-timeout (utils/message->timeout msg) - new-timeout (* 2 old-timeout) - response (async/ msg (utils/with-deadline (* new-timeout 1000)) (utils/with-timeout new-timeout))) - task))))) + :piped/action-map + (case (:action result) + :ack + (do + (async/>! ack-chan msg) + (recur nil nil)) + :nack + (do + (async/>! nack-chan (->> result + :delay-seconds + (utils/with-timeout msg))) + (recur nil nil))) + :piped/extend + (recur + (let [message-id (utils/message->identifier msg) + queue-url (utils/message->queue-url msg) + old-timeout (utils/message->timeout msg) + new-timeout (* 2 old-timeout) + response (async/ msg (utils/with-deadline (* new-timeout 1000)) (utils/with-timeout new-timeout))) + task)))))) (defn spawn-consumer-async "Spawns a consumer fit for cpu bound or asynchronous tasks. Uses the core.async dispatch thread pool. @@ -56,17 +72,24 @@ " [client input-chan ack-chan nack-chan consumer-fn] (make-consumer client input-chan ack-chan nack-chan - (fn [msg] - (async/go - (try - (loop [result (consumer-fn msg)] - (if (utils/channel? result) - (recur (async/ (or client-opts {}) (not (contains? client-opts :http-client)) (assoc :http-client (force http-client)) @@ -107,7 +107,7 @@ nacker-chan (async/chan) pipe (async/chan) acker-batched (utils/deadline-batching acker-chan 10) - nacker-batched (utils/interval-batching nacker-chan 5000 10) + nacker-batched (utils/combo-batching nacker-chan 5000 10) composed-consumer (if transform-fn (comp consumer-fn transform-fn) consumer-fn)] (letfn [(spawn-producer [] @@ -135,41 +135,39 @@ :consumers (doall (repeatedly consumer-parallelism spawn-consumer)) :ackers (doall (repeatedly acker-parallelism spawn-acker)) :nackers (doall (repeatedly nacker-parallelism spawn-nacker))})))] - - (let [state (atom (delay (launch))) shutdown-thread (Thread. - ^Runnable - (fn [] - (when (realized? (deref state)) - (log/debugf "Processor shutdown for %s initiated." queue-url) - (let [{:keys [pipe - acker-chan - nacker-chan - producers - consumers - ackers - nackers - client]} (force (deref state))] - (log/debugf "Signaling producers and consumers to exit for %s processor." queue-url) - (async/close! pipe) - (run! async/ results - (update :Successful #(into % (:Successful value []))) - (update :Failed #(into % (:Failed value [])))))))))) + (disj channels port) + (-> results + (update :Successful #(into % (:Successful value []))) + (update :Failed #(into % (:Failed value [])))))))))) (defn change-visibility-one [client {:keys [ReceiptHandle] :as message} visibility-timeout] (let [request {:op :ChangeMessageVisibility @@ -25,16 +26,19 @@ :VisibilityTimeout visibility-timeout}}] (api.async/invoke client request))) -(defn change-visibility-batch [client messages visibility-timeout] - (->> (for [[queue-url messages] (group-by utils/message->queue-url messages)] - (let [request {:op :ChangeMessageVisibilityBatch - :request {:QueueUrl queue-url - :Entries (for [{:keys [MessageId ReceiptHandle]} messages] - {:Id MessageId - :ReceiptHandle ReceiptHandle - :VisibilityTimeout visibility-timeout})}}] - (api.async/invoke client request))) - (combine-batch-results))) +(defn change-visibility-batch + ([client messages visibility-timeout] + (->> (for [[queue-url messages] (group-by utils/message->queue-url messages)] + (let [request {:op :ChangeMessageVisibilityBatch + :request {:QueueUrl queue-url + :Entries (for [{:keys [MessageId ReceiptHandle] :as msg} + messages] + {:Id MessageId + :ReceiptHandle ReceiptHandle + :VisibilityTimeout (or (utils/message->timeout msg) + visibility-timeout)})}}] + (api.async/invoke client request))) + (combine-batch-results)))) (defn ack-many [client messages] (->> (for [[queue-url messages] (group-by utils/message->queue-url messages)] @@ -49,4 +53,3 @@ (defn nack-many [client messages] (change-visibility-batch client messages 0)) - diff --git a/src/piped/utils.clj b/src/piped/utils.clj index a341fc0..2b35eae 100644 --- a/src/piped/utils.clj +++ b/src/piped/utils.clj @@ -1,8 +1,10 @@ (ns piped.utils "Utility functions." - (:require [clojure.core.async :as async]) - (:import [clojure.core.async.impl.channels ManyToManyChannel] - [java.util UUID])) + (:require + [clojure.core.async :as async]) + (:import + [clojure.core.async.impl.channels ManyToManyChannel] + [java.util UUID])) (defn message->queue-url [message] (some-> message meta :queue-url)) @@ -54,18 +56,16 @@ ([] (backoff-seq 60000)) ([max] (->> - (lazy-cat - (->> (cons 0 (iterate (partial * 2) 1000)) - (take-while #(< % max))) - (repeat max)) - (map (fn [x] (+ x (rand-int 1000))))))) + (lazy-cat + (->> (cons 0 (iterate (partial * 2) 1000)) + (take-while #(< % max))) + (repeat max)) + (map (fn [x] (+ x (rand-int 1000))))))) (defn deadline-batching "Batches messages from chan and emits the most recently accumulated batch whenever the max batch size is reached or one of the messages in the batch has become 'due' - for action. deadline-fn is a function of a message that returns a channel that - closes when the message is 'due'. deadline-fn may return nil if a message has no - particular urgency." + for action." [chan max] (let [return (async/chan)] (async/go-loop [channels [chan] batch {}] @@ -73,15 +73,18 @@ (when (async/>! return (vals batch)) (recur [chan] {})) (if-some [[value port] (async/alts! channels :priority true)] + ;; Drew from a deadline. (if-not (identical? port chan) (if (seq batch) (when (async/>! return (vals batch)) (recur [chan] {})) (recur [chan] {})) (if (some? value) - (if-some [deadline (message->deadline value)] - (recur (conj channels deadline) (assoc batch (message->identifier value) value)) - (recur channels (assoc batch (message->identifier value) value))) + (let [identifier (message->identifier value) + new-batch (assoc batch identifier value)] + (if-some [deadline (message->deadline value)] + (recur (conj channels deadline) new-batch) + (recur channels new-batch))) (do (when (seq batch) (async/>! return (vals batch))) (async/close! return))))))) @@ -93,8 +96,11 @@ (interval-batching chan msecs nil)) ([chan msecs max] (let [return (async/chan)] - (async/go-loop [deadline (async/timeout msecs) batch {}] - (if-some [result (async/alt! [chan] ([v] v) [deadline] ::timeout :priority true)] + (async/go-loop [deadline (async/timeout msecs) + batch {}] + (if-some [result (async/alt! [chan] ([v] v) + [deadline] ::timeout + :priority true)] (case result ::timeout (if (empty? batch) @@ -103,14 +109,58 @@ (recur (async/timeout msecs) {}))) (let [new-batch (assoc batch (message->identifier result) result)] (if (and max (= max (count new-batch))) - (when (async/>! return (vals batch)) + (when (async/>! return (vals new-batch)) (recur (async/timeout msecs) {})) + ;; Continue accumulating with deadline. (recur deadline new-batch)))) (do (when (not-empty batch) (async/>! return (vals batch))) (async/close! return)))) return))) +(defn combo-batching + ([chan msecs] + (combo-batching chan msecs nil)) + ([chan msecs max] + (let [return (async/chan)] + (async/go-loop [channels [chan (async/timeout msecs)] + batch {}] + (if (= max (count batch)) + (when (async/>! return (vals batch)) + ;; Reset after batch is sent. + (recur [chan (async/timeout msecs)] {})) + (if-some [[value port] (async/alts! + channels + :priority true)] + ;; Drew from a deadline. + (if-not (identical? port chan) + (if (seq batch) + (when (async/>! return (vals batch)) + ;; Reset after batch sent. + (recur [chan (async/timeout msecs)] {})) + ;; No batch but reset the deadlines. + (recur [chan (async/timeout msecs)] {})) + (if (some? value) + (let [identifier (message->identifier value) + timeout (message->timeout value) + deadline (message->deadline value) + new-batch (assoc batch identifier value)] + ;; Message has a non-zero VisibilityTimeout, so + ;; it needs to be :nacked before the deadline or + ;; that non-zero timeout will reset to 0. + (if (and timeout deadline) + ;; Accumulate and track the new deadline. + ;; Set deadline before the other channels to ensure + ;; it receives priority over the generic message chan. + (recur (cons deadline channels) new-batch) + ;; Continue accumulating with existing deadlines. + (recur channels new-batch))) + (do + (when (seq batch) + (async/>! return (vals batch))) + (async/close! return))))))) + return))) + (defmacro defmulti* "Like clojure.core/defmulti, but actually updates the dispatch value when you reload it." [symbol dispatch-fn] @@ -121,4 +171,4 @@ (let [holder# (volatile! dispatch-fun#) var# (defmulti ~symbol (fn [& args#] (apply @holder# args#)))] (alter-meta! var# merge {::holder holder#}) - var#)))) \ No newline at end of file + var#)))) diff --git a/test/piped/core_test.clj b/test/piped/core_test.clj index 2664597..1fb7479 100644 --- a/test/piped/core_test.clj +++ b/test/piped/core_test.clj @@ -1,8 +1,9 @@ (ns piped.core-test - (:require [clojure.test :refer :all] - [piped.core :refer :all] - [clojure.edn :as edn] - [piped.support :as support])) + (:require + [clojure.edn :as edn] + [clojure.test :refer :all] + [piped.core :refer :all] + [piped.support :as support])) (use-fixtures :each (fn [tests] (stop-all-processors!) (tests) (stop-all-processors!))) @@ -14,10 +15,10 @@ received (promise) consumer (fn [message] (deliver received message)) system (start (processor - {:consumer-fn consumer - :queue-url queue-url - :client-opts (support/localstack-client-opts) - :transform-fn transform})) + {:consumer-fn consumer + :queue-url queue-url + :client-opts (support/localstack-client-opts) + :transform-fn transform})) data {:value 1}] (try (support/send-message queue-url data) @@ -25,4 +26,4 @@ (is (not= message :aborted)) (is (= data (get message :Body)))) (finally - (stop system))))) \ No newline at end of file + (stop system))))) diff --git a/test/piped/extensions_test.clj b/test/piped/extensions_test.clj index ff2458a..4bd992f 100644 --- a/test/piped/extensions_test.clj +++ b/test/piped/extensions_test.clj @@ -1,10 +1,12 @@ (ns piped.extensions-test - (:require [clojure.test :refer :all] - [piped.core :as piped] - [piped.support :as support] - [clojure.tools.logging :as log]) - (:import [java.util.concurrent CountDownLatch TimeUnit])) - + (:require + [clojure.edn :as edn] + [clojure.test :refer :all] + [clojure.tools.logging :as log] + [piped.core :as piped] + [piped.support :as support]) + (:import + [java.util.concurrent CountDownLatch TimeUnit])) (deftest visibility-timeouts-are-extended (let [queue-name (support/gen-queue-name) @@ -17,14 +19,48 @@ (.countDown finished) :ack) processor (piped/processor - {:queue-url queue-url - :client-opts (support/localstack-client-opts) - :consumer-fn consumer - :consumer-parallelism 2})] + {:queue-url queue-url + :client-opts (support/localstack-client-opts) + :consumer-fn consumer + :consumer-parallelism 2})] + (support/send-message-batch queue-url messages) + (piped/start processor) + (loop [waiting 25] + (when-not (.await finished waiting TimeUnit/SECONDS) + (is (empty? (support/receive-message-batch queue-url))) + (recur 1))) + (piped/stop processor))) + +(def transform #(update % :Body edn/read-string)) + +(deftest configurable-visibility-timeouts + (let [queue-name (support/gen-queue-name) + queue-url (support/create-queue queue-name) + messages [{:value 1} {:value 2}] + finished (CountDownLatch. (count messages)) + consumer (fn [{:keys [Body + Attributes]}] + (condp = (:value Body) + 1 (do (.countDown finished) + {:action :ack}) + 2 (let [attempts (some-> Attributes + (get "ApproximateReceiveCount" "0") + (Integer/parseInt))] + (if (< attempts 3) + {:action :nack :delay-seconds (rand-int 2)} + (do + (.countDown finished) + {:action :ack}))))) + processor (piped/processor + {:queue-url queue-url + :client-opts (support/localstack-client-opts) + :consumer-fn consumer + :consumer-parallelism 2 + :transform-fn transform})] (support/send-message-batch queue-url messages) (piped/start processor) (loop [waiting 25] (when-not (.await finished waiting TimeUnit/SECONDS) (is (empty? (support/receive-message-batch queue-url))) (recur 1))) - (piped/stop processor))) \ No newline at end of file + (piped/stop processor))) diff --git a/test/piped/specs_test.clj b/test/piped/specs_test.clj new file mode 100644 index 0000000..a3e1cb9 --- /dev/null +++ b/test/piped/specs_test.clj @@ -0,0 +1,36 @@ +(ns piped.specs-test + (:require + [clojure.spec.alpha :as s] + [clojure.test :refer [deftest is testing]])) + +(deftest specs-test + (testing "processor required fields" + (let [spec {:queue-url "http://queue-url" + :consumer-fn (fn [msg] + (constantly msg) + :ack)}] + (is (s/valid? :piped/options-map spec)) + (is (not (s/valid? :piped/options-map (spec dissoc :queue-url)))) + (is (not (s/valid? :piped/options-map (spec dissoc :consumer-fn)))))) + (testing "action maps" + (let [spec {:action :ack + :delay-seconds 5}] + (is (s/valid? :piped/action-map spec)) + (is (s/valid? :piped/action-map + (assoc spec + :action :nack))) + (is (s/valid? :piped/action-map + (dissoc spec + :delay-seconds))) + (is (not (s/valid? :piped/action-map + (assoc spec + :delay-seconds "a")))) + (is (not (s/valid? :piped/action-map + (assoc spec + :delay-seconds -1)))) + (is (not (s/valid? :piped/action-map + (assoc spec + :delay-seconds 1.13)))) + (is (not (s/valid? :piped/action-map + (assoc spec + :action :do-something))))))) diff --git a/test/piped/support.clj b/test/piped/support.clj index 641600e..9e09e8a 100644 --- a/test/piped/support.clj +++ b/test/piped/support.clj @@ -66,4 +66,4 @@ (name (gensym "piped-test-queue"))) (defn dev-null [] - (async/chan (async/dropping-buffer 0))) \ No newline at end of file + (async/chan (async/dropping-buffer 0))) diff --git a/test/piped/sweet_test.clj b/test/piped/sweet_test.clj index d854acd..72070af 100644 --- a/test/piped/sweet_test.clj +++ b/test/piped/sweet_test.clj @@ -33,4 +33,3 @@ (piped/start #'my-processor) (piped/stop #'my-processor)) - diff --git a/test/piped/utils_test.clj b/test/piped/utils_test.clj index 7944648..127d3b4 100644 --- a/test/piped/utils_test.clj +++ b/test/piped/utils_test.clj @@ -24,7 +24,6 @@ (is (= 1 (count (deref received)))) (is (= 3 (count (first (deref received))))))) - (deftest interval-batching-test (let [received (atom []) msg1 {:data 1} @@ -46,3 +45,36 @@ (async/ {:data 3} + (with-deadline 100) + (with-timeout 10000)) + chan (async/chan) + return (combo-batching chan 1001 5)] + (async/go-loop [] + (when-some [item (async/!! chan msg1) + (async/>!! chan msg2) + (is (empty? (deref received))) + (async/!! chan msg3) + (async/close! chan) + (async/ received deref count))) + (is (= 3 (-> received deref first count))) + ;; The last message was pulled out before its deadline. + (async/go + (is (not (nil? (-> received + deref + first + last + message->deadline + async/