Skip to content

Commit

Permalink
move to deps, upgrade dependencies, configure github actions (#10)
Browse files Browse the repository at this point in the history
* move to deps, upgrade dependencies, configure github actions
  • Loading branch information
RutledgePaulV authored Sep 30, 2023
1 parent ffea082 commit 1e4aa7d
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 138 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: check

on: [ push ]

jobs:
clojure:
strategy:
matrix:
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Prepare java
uses: actions/setup-java@v3
with:
distribution: 'corretto'
java-version: '19'
- name: Install clojure tools
uses: DeLaGuardo/[email protected]
with:
cli: latest
- name: Cache clojure dependencies
uses: actions/cache@v3
with:
path: |
~/.m2/repository
~/.gitlibs
~/.deps.clj
key: cljdeps-${{ hashFiles('deps.edn') }}
restore-keys: cljdeps-
- name: Run tests
run: clojure -X:test:run-tests
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pom.xml.asc
.hg/
.idea/
*.iml
.cpcache
7 changes: 0 additions & 7 deletions .travis.yml

This file was deleted.

26 changes: 26 additions & 0 deletions builds/build.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
(ns build
(:require [clojure.tools.build.api :as b]))

(def lib 'org.clojars.rutledgepaulv/piped)
(def version "0.1.8")
(def class-dir "target/classes")
(def basis (b/create-basis {:project "deps.edn"}))
(def jar-file (format "target/piped.jar" (name lib) version))

(defn get-version [_]
(print version))

(defn clean [_]
(b/delete {:path "target"}))

(defn jar [_]
(b/write-pom {:class-dir class-dir
:lib lib
:version version
:basis basis
:src-dirs ["src"]
:scm {:tag (str "v" version)
:connection (str "scm:git:[email protected]:rutledgepaulv/" (name lib) ".git")
:url (str "https://github.com/rutledgepaulv/" (name lib))}})
(b/copy-dir {:src-dirs ["src"] :target-dir class-dir})
(b/jar {:class-dir class-dir :jar-file jar-file}))
25 changes: 25 additions & 0 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{:paths ["src" "resources"]

:deps {org.clojure/clojure {:mvn/version "1.11.1"}
com.cognitect.aws/api {:mvn/version "0.8.686"}
com.cognitect.aws/endpoints {:mvn/version "1.1.12.504"}
com.cognitect.aws/sqs {:mvn/version "847.2.1398.0"}
org.clojure/core.async {:mvn/version "1.6.681"}
org.clojure/tools.logging {:mvn/version "1.2.4"}}

:aliases
{:build {:extra-deps {io.github.clojure/tools.build
{:git/url "https://github.com/clojure/tools.build.git"
:sha "32d497f4f1ad07cb1dfa0855ada9e9cf17abff48"}}
:extra-paths ["builds"]
:ns-default build}

:test {:extra-paths ["test"]
:extra-deps {org.testcontainers/testcontainers {:mvn/version "1.19.0"}
org.slf4j/slf4j-simple {:mvn/version "2.0.9"}}}

:run-tests {:extra-deps {io.github.cognitect-labs/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner.git"
:git/sha "7284cda41fb9edc0f3bc6b6185cfb7138fc8a023"}}
:main-opts ["-m" "cognitect.test-runner"]
:exec-fn cognitect.test-runner.api/test}}}
46 changes: 0 additions & 46 deletions project.clj

This file was deleted.

20 changes: 20 additions & 0 deletions release.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

clj -X:build clean
clj -X:build jar
version=$(clj -X:build get-version)

export CLOJARS_USERNAME="op://Personal/clojars.org/username"
export CLOJARS_PASSWORD="op://Personal/clojars.org/token"

op run -- mvn deploy:deploy-file \
-DgroupId="org.clojars.rutledgepaulv" \
-DartifactId="piped" \
-Dversion="$version" \
-Dpackaging="jar" \
-Dfile="target/piped.jar" \
-DrepositoryId="clojars" \
-Durl="https://repo.clojars.org"

git tag "v$version"
git push origin "refs/tags/v$version"
65 changes: 30 additions & 35 deletions src/piped/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
[cognitect.aws.http :as http]
[cognitect.http-client :as impl]
[piped.specs :as specs]
[aws-api-credential-providers.core :as cp]
[clojure.tools.logging :as log]))

(defprotocol PipedProcessor
Expand Down Expand Up @@ -90,15 +89,11 @@
nacker-parallelism (or nacker-parallelism producer-parallelism)
max-http-ops (+ producer-parallelism acker-parallelism nacker-parallelism)
http-client (delay (http-client
{:pending-ops-limit max-http-ops
:max-connections-per-destination max-http-ops}))
credentials-provider (delay (cp/default-credentials-provider
(or (:http-client client-opts) (force http-client))))
{:pending-ops-limit max-http-ops
:max-connections-per-destination max-http-ops}))
client (cond-> (or client-opts {})
(not (contains? client-opts :http-client))
(assoc :http-client (force http-client))
(not (contains? client-opts :credentials-provider))
(assoc :credentials-provider (force credentials-provider))
:always
(assoc :api :sqs)
:always
Expand Down Expand Up @@ -140,34 +135,34 @@

shutdown-thread
(Thread.
^Runnable
(fn []
(when (realized? (deref state))
(log/debugf "Processor shutdown for %s initiated." queue-url)
(let [{:keys [pipe
acker-chan
nacker-chan
producers
consumers
ackers
nackers
client]} (force (deref state))]
(log/debugf "Signaling producers and consumers to exit for %s processor." queue-url)
(async/close! pipe)
(run! async/<!! producers)
(log/debugf "Producers have exited for %s processor." queue-url)
(run! async/<!! consumers)
(log/debugf "Consumers have exited for %s processor." queue-url)
(log/debugf "Signaling ackers to exit for %s processor." queue-url)
(async/close! acker-chan)
(run! async/<!! ackers)
(log/debugf "Ackers have exited for %s processor." queue-url)
(log/debugf "Signaling nackers to exit for %s processor." queue-url)
(async/close! nacker-chan)
(run! async/<!! nackers)
(log/debugf "Nackers have exited for %s processor." queue-url)
(aws/stop client)
(log/debugf "Processor shutdown for %s finished." queue-url)))))
^Runnable
(fn []
(when (realized? (deref state))
(log/debugf "Processor shutdown for %s initiated." queue-url)
(let [{:keys [pipe
acker-chan
nacker-chan
producers
consumers
ackers
nackers
client]} (force (deref state))]
(log/debugf "Signaling producers and consumers to exit for %s processor." queue-url)
(async/close! pipe)
(run! async/<!! producers)
(log/debugf "Producers have exited for %s processor." queue-url)
(run! async/<!! consumers)
(log/debugf "Consumers have exited for %s processor." queue-url)
(log/debugf "Signaling ackers to exit for %s processor." queue-url)
(async/close! acker-chan)
(run! async/<!! ackers)
(log/debugf "Ackers have exited for %s processor." queue-url)
(log/debugf "Signaling nackers to exit for %s processor." queue-url)
(async/close! nacker-chan)
(run! async/<!! nackers)
(log/debugf "Nackers have exited for %s processor." queue-url)
(aws/stop client)
(log/debugf "Processor shutdown for %s finished." queue-url)))))

system
(reify PipedProcessor
Expand Down
8 changes: 6 additions & 2 deletions src/piped/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
(s/def :piped/message map?)
(s/def :piped/action #{:ack :nack})
(s/def :piped/extend #{:extend})
(s/def :piped/action-map (s/map-of #{:action :delay-seconds} #(or (s/valid? :piped/action %)
(nat-int? %))))
(s/def :piped/delay-seconds nat-int?)
(s/def :piped/action-map
(s/keys
:req-un [:piped/action]
:opt-un [:piped/delay-seconds]))

(s/def :piped/consumer-fn ifn?)
(s/def :piped/transform-fn ifn?)
(s/def :piped/system any?)
Expand Down
37 changes: 11 additions & 26 deletions test/piped/specs_test.clj
Original file line number Diff line number Diff line change
@@ -1,36 +1,21 @@
(ns piped.specs-test
(:require
[clojure.spec.alpha :as s]
[clojure.test :refer [deftest is testing]]))
[clojure.spec.alpha :as s]
[clojure.test :refer :all]))

(deftest specs-test
(testing "processor required fields"
(let [spec {:queue-url "http://queue-url"
:consumer-fn (fn [msg]
(constantly msg)
:ack)}]
(let [spec {:queue-url "http://queue-url"
:consumer-fn (fn [msg] (constantly msg) :ack)}]
(is (s/valid? :piped/options-map spec))
(is (not (s/valid? :piped/options-map (spec dissoc :queue-url))))
(is (not (s/valid? :piped/options-map (spec dissoc :consumer-fn))))))
(testing "action maps"
(let [spec {:action :ack
:delay-seconds 5}]
(let [spec {:action :ack :delay-seconds 5}]
(is (s/valid? :piped/action-map spec))
(is (s/valid? :piped/action-map
(assoc spec
:action :nack)))
(is (s/valid? :piped/action-map
(dissoc spec
:delay-seconds)))
(is (not (s/valid? :piped/action-map
(assoc spec
:delay-seconds "a"))))
(is (not (s/valid? :piped/action-map
(assoc spec
:delay-seconds -1))))
(is (not (s/valid? :piped/action-map
(assoc spec
:delay-seconds 1.13))))
(is (not (s/valid? :piped/action-map
(assoc spec
:action :do-something)))))))
(is (s/valid? :piped/action-map (assoc spec :action :nack)))
(is (s/valid? :piped/action-map (dissoc spec :delay-seconds)))
(is (not (s/valid? :piped/action-map (assoc spec :delay-seconds "a"))))
(is (not (s/valid? :piped/action-map (assoc spec :delay-seconds -1))))
(is (not (s/valid? :piped/action-map (assoc spec :delay-seconds 1.13))))
(is (not (s/valid? :piped/action-map (assoc spec :action :do-something)))))))
8 changes: 4 additions & 4 deletions test/piped/support.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

(defonce localstack
(delay
(let [container (GenericContainer. "localstack/localstack:0.10.9")]
(let [container (GenericContainer. "localstack/localstack:2.3.1")]
(doto container
(.setExposedPorts [(int 4576)])
(.setEnv ["SERVICES=sqs" (str "HOSTNAME_EXTERNAL=" (.getContainerIpAddress container))])
(.setExposedPorts [(int 4566)])
(.setEnv ["SERVICES=sqs" (str "LOCALSTACK_HOST=" (.getContainerIpAddress container))])
(.withStartupTimeout (Duration/ofMinutes 1))
(.waitingFor (Wait/forLogMessage "^Ready\\.\\s*$" 1))
(.start))
Expand All @@ -28,7 +28,7 @@
:endpoint-override
{:protocol :http
:hostname (.getContainerIpAddress @localstack)
:port (.getMappedPort @localstack 4576)}})
:port (.getMappedPort @localstack 4566)}})

(def client
(delay (aws/client (localstack-client-opts))))
Expand Down
25 changes: 7 additions & 18 deletions test/piped/utils_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,19 @@
msg1 {:data 1}
msg2 {:data 2}
msg3 (-> {:data 3}
(with-deadline 100)
(with-deadline 300)
(with-timeout 10000))
chan (async/chan)
return (combo-batching chan 1001 5)]
(async/go-loop []
(when-some [item (async/<! return)]
(swap! received conj item)
(recur)))

(async/>!! chan msg1)
(async/>!! chan msg2)
(is (empty? (deref received)))
(async/<!! (async/timeout 100))
(nil? (async/poll! return))
(async/>!! chan msg3)
(async/close! chan)
(async/<!! (async/timeout 1000))
;; Two groups of messages
(is (= 1 (-> received deref count)))
(is (= 3 (-> received deref first count)))
;; The last message was pulled out before its deadline.
(async/go
(is (not (nil? (-> received
deref
first
last
message->deadline
async/<!)))))))
(let [start (System/currentTimeMillis)
batch (async/<!! return)
stop (System/currentTimeMillis)]
(is (= 3 (count batch)))
(is (< (- stop start) 600)))))

0 comments on commit 1e4aa7d

Please sign in to comment.