diff --git a/src/swark/cedric.cljc b/src/swark/cedric.cljc index 3940391..3d14b0f 100644 --- a/src/swark/cedric.cljc +++ b/src/swark/cedric.cljc @@ -282,14 +282,14 @@ (defn make-connection "Returns a map with ::transact! and ::close! functions." [db] - (let [conn (swark/with-buffer db)] + (let [conn (swark/atomic db)] {::transact! (partial swark/put! conn) ::close! #(swark/close! conn)})) (comment (let [connection (-> "/tmp/testdb123.csv" Csv. make-connection)] - (def transact! (::transact! connection)) - (def close! (::close! connection))) + (def transact! (::transact! connection)) ; Define transact! for this connection + (def close! (::close! connection))) ; Define close! for this connection ;; Upsert items via the transact! function (transact! upsert-items {:primary-key :user/id} [{:user/name "Arnold"} {:user/name "Naomi"} {:user/name "Theodor"}]) diff --git a/src/swark/core.cljc b/src/swark/core.cljc index 4e50fc6..7ebcb1d 100644 --- a/src/swark/core.cljc +++ b/src/swark/core.cljc @@ -223,41 +223,58 @@ ;; Async stuff ;; TODO: Support channel transducers and ex-handler as well -(defn with-buffer +(defn atomic {:added "0.1.41" :arglist '([x]) - :doc "Starts a go-loop and returns a map with ::in and ::out async channels. - Input to ::in chan is expected to be [f & args] or [::closed!]. In the latter - case, the go-loop will stop. In the first case, (apply f x args) will be called - and the result is put on ::out chan."} - [x] - (let [in-chan (a/chan (a/sliding-buffer 99)) - out-chan (a/chan (a/dropping-buffer 99))] + :doc "Returns a map representin a connection to something stateful or with + side-effects, so you can treat state or side-effects in a similar manner to + Clojure atoms. + Starts a go-loop in the background and returns a map with ::in and ::out async + channels. Input to ::in chan is expected to be [f & args] or [::closed!]. In + the latter case, the go-loop will stop. In the first case, (apply f x args) + will be called and the result is put on ::out chan. + Valid props are :in-buffer-size (default 99) & :out-buffer-size (default 99). + An atomic thing is similar to an atom, you can `put!` things, just like you + would swap! an atom. Be sure to close the atomic by calling `close!` on it."} + [x & options] + (let [props (merge + {:in-buffer-size 99 :out-buffer-size 99} + (apply hash-map options)) + spec {:in-buffer-size pos-int? :out-buffer-size pos-int?} + _ (-> spec (valid-map? props) assert) + in-chan (a/chan (a/sliding-buffer (:in-buffer-size props))) + out-chan (a/chan (a/dropping-buffer (:out-buffer-size props)))] (a/go-loop [[f & args] (a/ f #{::closed!}) ; NOTE: Stop the go-loop in this case - (if-let [result (when f (apply f x args))] + (if-some [result (when (fn? f) (apply f x args))] (a/>! out-chan result) - (a/>! out-chan ::nil)) + (a/>! out-chan ::nil)) ; Put explicit ::nil on channel (recur (a/! in (or args [::closed!]))) ; NOTE: Close the go-loop when nil args - (a/! in (or (->> args (keep identity) seq) [::closed!]))) ; NOTE: Close the go-loop when nil args + (let [result (a/ result #{::nil}) ; Simply return nil when ::nil is returned + result))) (defn close! {:added "0.1.41" - :arglist '([buffered]) + :arglist '([atomic]) :doc "Stops the underlying go-loop and closes all channels. Returns nil."} - [buffered] - (put! buffered nil) ; NOTE: Close the running go-loop + [atomic] + (-> atomic ::in assert) + (a/>!! (::in atomic) [::closed!]) ; NOTE: Close the running go-loop (let [channels (juxt ::in ::out)] - (run! a/close! (channels buffered)))) + (run! a/close! (channels atomic))) + ::closed!) diff --git a/test/swark/cedric_test.clj b/test/swark/cedric_test.clj index 8c449c6..3aefc76 100644 --- a/test/swark/cedric_test.clj +++ b/test/swark/cedric_test.clj @@ -53,14 +53,12 @@ ;; Test all implementations in exactly the same way! (doseq [make-db [#(Mem. (atom nil)) #(Csv. (str "/tmp/testdb-" (swark/unid) ".csv"))]] - (let [db (make-db) - db-conn (swark/with-buffer db) - transact! (partial swark/put! db-conn) - props {:primary-key :person/id} - the-names (some-names 25) - persons (map (partial assoc nil :person/name) the-names) - result (transact! sut/upsert-items props persons)] - ;; result + (let [{::sut/keys + [transact! close!]} (sut/make-connection (make-db)) + props {:primary-key :person/id} + the-names (some-names 25) + persons (map (partial assoc nil :person/name) the-names) + result (transact! sut/upsert-items props persons)] (testing "upsert-items" (testing "returns the upserted items" (is (-> result count (= 25))) @@ -82,4 +80,4 @@ (is (= {::sut/archived 5} archived)))) (testing "returns all the items" (is (-> (transact! sut/read-items {}) count #{20}))) - (swark/close! db-conn)))) + (close!)))) diff --git a/test/swark/core_test.clj b/test/swark/core_test.clj index 4dd6a11..16595e5 100644 --- a/test/swark/core_test.clj +++ b/test/swark/core_test.clj @@ -107,9 +107,10 @@ #"All vals in spec should implement IFn" {:id "not IFn"} {:id -1} ; Spec #"Input should be a map!" {:id nat-int?} false))) -(t/deftest with-buffer-put-close - (let [{::sut/keys [in out] :as m} (sut/with-buffer {:test "map"})] - (t/is (and in out)) - (t/is (= {:test "map" :key :value} (sut/put! m assoc :key :value))) - (doto m sut/close!) ; Close it, after this every eval of put! returns nil - (t/is (nil? (sut/put! m assoc :another "entry"))))) +(t/deftest atomic + (let [atomic (sut/atomic "Hello" :in-buffer-size 1 :out-buffer-size 1) + transact! (partial sut/put! atomic)] + (t/is (= "Hello, World!" (transact! str ", " "World!"))) + (t/is (nil? (transact! :not-a-fn ", " "World!"))) + (t/is (= ::sut/closed! (sut/close! atomic))) ; Close it, after this every eval of put! returns nil + (t/is (nil? (transact! str "silence")))))