Skip to content

Commit

Permalink
Atomic (#16)
Browse files Browse the repository at this point in the history
- [x] Rename buffered to atomic
- [x] extend docstrings
  • Loading branch information
verberktstan authored Feb 28, 2024
1 parent 270124f commit b2fb115
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
6 changes: 3 additions & 3 deletions src/swark/cedric.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,14 @@
(defn make-connection
"Returns a map with ::transact! and ::close! functions."
[db]
(let [conn (swark/with-buffer db)]
(let [conn (swark/atomic db)]
{::transact! (partial swark/put! conn)
::close! #(swark/close! conn)}))

(comment
(let [connection (-> "/tmp/testdb123.csv" Csv. make-connection)]
(def transact! (::transact! connection))
(def close! (::close! connection)))
(def transact! (::transact! connection)) ; Define transact! for this connection
(def close! (::close! connection))) ; Define close! for this connection

;; Upsert items via the transact! function
(transact! upsert-items {:primary-key :user/id} [{:user/name "Arnold"} {:user/name "Naomi"} {:user/name "Theodor"}])
Expand Down
55 changes: 36 additions & 19 deletions src/swark/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -223,41 +223,58 @@
;; Async stuff

;; TODO: Support channel transducers and ex-handler as well
(defn with-buffer
(defn atomic
{:added "0.1.41"
:arglist '([x])
:doc "Starts a go-loop and returns a map with ::in and ::out async channels.
Input to ::in chan is expected to be [f & args] or [::closed!]. In the latter
case, the go-loop will stop. In the first case, (apply f x args) will be called
and the result is put on ::out chan."}
[x]
(let [in-chan (a/chan (a/sliding-buffer 99))
out-chan (a/chan (a/dropping-buffer 99))]
:doc "Returns a map representin a connection to something stateful or with
side-effects, so you can treat state or side-effects in a similar manner to
Clojure atoms.
Starts a go-loop in the background and returns a map with ::in and ::out async
channels. Input to ::in chan is expected to be [f & args] or [::closed!]. In
the latter case, the go-loop will stop. In the first case, (apply f x args)
will be called and the result is put on ::out chan.
Valid props are :in-buffer-size (default 99) & :out-buffer-size (default 99).
An atomic thing is similar to an atom, you can `put!` things, just like you
would swap! an atom. Be sure to close the atomic by calling `close!` on it."}
[x & options]
(let [props (merge
{:in-buffer-size 99 :out-buffer-size 99}
(apply hash-map options))
spec {:in-buffer-size pos-int? :out-buffer-size pos-int?}
_ (-> spec (valid-map? props) assert)
in-chan (a/chan (a/sliding-buffer (:in-buffer-size props)))
out-chan (a/chan (a/dropping-buffer (:out-buffer-size props)))]
(a/go-loop [[f & args] (a/<! in-chan)]
(when-not (some-> f #{::closed!}) ; NOTE: Stop the go-loop in this case
(if-let [result (when f (apply f x args))]
(if-some [result (when (fn? f) (apply f x args))]
(a/>! out-chan result)
(a/>! out-chan ::nil))
(a/>! out-chan ::nil)) ; Put explicit ::nil on channel
(recur (a/<! in-chan))))
{::in in-chan
::out out-chan}))

(defn put!
{:added "0.1.41"
:arglist '([buffered & args])
:doc "Put args on the ::in chan and blocks until something is returned via
::out chan. Returns the returned value."}
:arglist '([atomic & args])
:doc "Put args on the ::in chan and block until something is returned via
::out chan. Returns the returned value.
Similar to swap! for atoms, but implemented as a async messaging system, so
it's useable for side-effects as well."}
[{::keys [in out]} & args]
(assert in)
(assert out)
(a/go (a/>! in (or args [::closed!]))) ; NOTE: Close the go-loop when nil args
(a/<!! out))
(a/go (a/>! in (or (->> args (keep identity) seq) [::closed!]))) ; NOTE: Close the go-loop when nil args
(let [result (a/<!! out)]
(when-not (some-> result #{::nil}) ; Simply return nil when ::nil is returned
result)))

(defn close!
{:added "0.1.41"
:arglist '([buffered])
:arglist '([atomic])
:doc "Stops the underlying go-loop and closes all channels. Returns nil."}
[buffered]
(put! buffered nil) ; NOTE: Close the running go-loop
[atomic]
(-> atomic ::in assert)
(a/>!! (::in atomic) [::closed!]) ; NOTE: Close the running go-loop
(let [channels (juxt ::in ::out)]
(run! a/close! (channels buffered))))
(run! a/close! (channels atomic)))
::closed!)
16 changes: 7 additions & 9 deletions test/swark/cedric_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@
;; Test all implementations in exactly the same way!
(doseq [make-db [#(Mem. (atom nil))
#(Csv. (str "/tmp/testdb-" (swark/unid) ".csv"))]]
(let [db (make-db)
db-conn (swark/with-buffer db)
transact! (partial swark/put! db-conn)
props {:primary-key :person/id}
the-names (some-names 25)
persons (map (partial assoc nil :person/name) the-names)
result (transact! sut/upsert-items props persons)]
;; result
(let [{::sut/keys
[transact! close!]} (sut/make-connection (make-db))
props {:primary-key :person/id}
the-names (some-names 25)
persons (map (partial assoc nil :person/name) the-names)
result (transact! sut/upsert-items props persons)]
(testing "upsert-items"
(testing "returns the upserted items"
(is (-> result count (= 25)))
Expand All @@ -82,4 +80,4 @@
(is (= {::sut/archived 5} archived))))
(testing "returns all the items"
(is (-> (transact! sut/read-items {}) count #{20})))
(swark/close! db-conn))))
(close!))))
13 changes: 7 additions & 6 deletions test/swark/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@
#"All vals in spec should implement IFn" {:id "not IFn"} {:id -1} ; Spec
#"Input should be a map!" {:id nat-int?} false)))

(t/deftest with-buffer-put-close
(let [{::sut/keys [in out] :as m} (sut/with-buffer {:test "map"})]
(t/is (and in out))
(t/is (= {:test "map" :key :value} (sut/put! m assoc :key :value)))
(doto m sut/close!) ; Close it, after this every eval of put! returns nil
(t/is (nil? (sut/put! m assoc :another "entry")))))
(t/deftest atomic
(let [atomic (sut/atomic "Hello" :in-buffer-size 1 :out-buffer-size 1)
transact! (partial sut/put! atomic)]
(t/is (= "Hello, World!" (transact! str ", " "World!")))
(t/is (nil? (transact! :not-a-fn ", " "World!")))
(t/is (= ::sut/closed! (sut/close! atomic))) ; Close it, after this every eval of put! returns nil
(t/is (nil? (transact! str "silence")))))

0 comments on commit b2fb115

Please sign in to comment.