Skip to content

Commit

Permalink
Update konserve version (#571)
Browse files Browse the repository at this point in the history
* Update konserve version

* Increase datahike minor version
  • Loading branch information
jsmassa authored Nov 4, 2022
1 parent 7f37bbe commit 024a18f
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 72 deletions.
2 changes: 1 addition & 1 deletion build.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:import (clojure.lang ExceptionInfo)))

(def lib 'io.replikativ/datahike)
(def version (format "0.5.%s" (b/git-count-revs nil)))
(def version (format "0.6.%s" (b/git-count-revs nil)))
(def current-commit (gh/current-commit))
(def class-dir "target/classes")
(def basis (b/create-basis {:project "deps.edn"}))
Expand Down
4 changes: 2 additions & 2 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.1"}
org.clojure/clojurescript {:mvn/version "1.11.4"}
io.replikativ/hasch {:mvn/version "0.3.7"}
io.replikativ/hitchhiker-tree {:mvn/version "0.1.11"}
io.replikativ/hitchhiker-tree {:mvn/version "0.2.222"}
io.replikativ/incognito {:mvn/version "0.3.66"}
io.replikativ/konserve {:mvn/version "0.6.0-alpha3"}
io.replikativ/konserve {:mvn/version "0.7.275"}
persistent-sorted-set/persistent-sorted-set {:mvn/version "0.1.4"}
environ/environ {:mvn/version "1.2.0"}
com.taoensso/timbre {:mvn/version "5.2.1"}
Expand Down
90 changes: 46 additions & 44 deletions src/datahike/connector.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,35 @@
(let [{:keys [db-after] :as tx-report} @(update-fn connection tx-data tx-meta)
{:keys [eavt aevt avet temporal-eavt temporal-aevt temporal-avet schema rschema system-entities ident-ref-map ref-ident-map config max-tx max-eid op-count hash meta]} db-after
store (:store @connection)
backend (kons/->KonserveBackend store)
backend (kons/->KonserveBackend store true)
eavt-flushed (di/-flush eavt backend)
aevt-flushed (di/-flush aevt backend)
avet-flushed (di/-flush avet backend)
keep-history? (:keep-history? config)
temporal-eavt-flushed (when keep-history? (di/-flush temporal-eavt backend))
temporal-aevt-flushed (when keep-history? (di/-flush temporal-aevt backend))
temporal-avet-flushed (when keep-history? (di/-flush temporal-avet backend))]
(<?? S (k/assoc-in store [:db]
(merge
{:schema schema
:rschema rschema
:system-entities system-entities
:ident-ref-map ident-ref-map
:ref-ident-map ref-ident-map
:config config
:meta meta
:hash hash
:max-tx max-tx
:max-eid max-eid
:op-count op-count
:eavt-key eavt-flushed
:aevt-key aevt-flushed
:avet-key avet-flushed}
(when keep-history?
{:temporal-eavt-key temporal-eavt-flushed
:temporal-aevt-key temporal-aevt-flushed
:temporal-avet-key temporal-avet-flushed}))))
(k/assoc-in store [:db]
(merge
{:schema schema
:rschema rschema
:system-entities system-entities
:ident-ref-map ident-ref-map
:ref-ident-map ref-ident-map
:config config
:meta meta
:hash hash
:max-tx max-tx
:max-eid max-eid
:op-count op-count
:eavt-key eavt-flushed
:aevt-key aevt-flushed
:avet-key avet-flushed}
(when keep-history?
{:temporal-eavt-key temporal-eavt-flushed
:temporal-aevt-key temporal-aevt-flushed
:temporal-avet-key temporal-avet-flushed}))
{:sync? true})
(reset! connection (assoc db-after
:eavt eavt-flushed
:aevt aevt-flushed
Expand Down Expand Up @@ -130,7 +131,7 @@
(kc/ensure-cache
raw-store
(atom (cache/lru-cache-factory {} :threshold 1000))))))
stored-db (<?? S (k/get-in store [:db]))]
stored-db (k/get-in store [:db] nil {:sync? true})]
(ds/release-store store-config store)
(not (nil? stored-db)))
(do
Expand All @@ -151,7 +152,7 @@
(kc/ensure-cache
raw-store
(atom (cache/lru-cache-factory {} :threshold 1000))))))
stored-db (<?? S (k/get-in store [:db]))
stored-db (k/get-in store [:db] nil {:sync? true})
_ (when-not stored-db
(ds/release-store store-config store)
(dt/raise "Database does not exist." {:type :db-does-not-exist
Expand Down Expand Up @@ -187,31 +188,32 @@
store (kc/ensure-cache
(ds/empty-store store-config)
(atom (cache/lru-cache-factory {} :threshold 1000)))
stored-db (<?? S (k/get-in store [:db]))
stored-db (k/get-in store [:db] nil {:sync? true})
_ (when stored-db
(dt/raise "Database already exists." {:type :db-already-exists :config store-config}))
{:keys [eavt aevt avet temporal-eavt temporal-aevt temporal-avet schema rschema system-entities ref-ident-map ident-ref-map config max-tx max-eid op-count hash meta]}
(db/empty-db nil config)
backend (kons/->KonserveBackend store)]
(<?? S (k/assoc-in store [:db]
(merge {:schema schema
:max-tx max-tx
:max-eid max-eid
:op-count op-count
:hash hash
:rschema rschema
:system-entities system-entities
:ident-ref-map ident-ref-map
:ref-ident-map ref-ident-map
:config config
:meta meta
:eavt-key (di/-flush eavt backend)
:aevt-key (di/-flush aevt backend)
:avet-key (di/-flush avet backend)}
(when keep-history?
{:temporal-eavt-key (di/-flush temporal-eavt backend)
:temporal-aevt-key (di/-flush temporal-aevt backend)
:temporal-avet-key (di/-flush temporal-avet backend)}))))
backend (kons/->KonserveBackend store true)]
(k/assoc-in store [:db]
(merge {:schema schema
:max-tx max-tx
:max-eid max-eid
:op-count op-count
:hash hash
:rschema rschema
:system-entities system-entities
:ident-ref-map ident-ref-map
:ref-ident-map ref-ident-map
:config config
:meta meta
:eavt-key (di/-flush eavt backend)
:aevt-key (di/-flush aevt backend)
:avet-key (di/-flush avet backend)}
(when keep-history?
{:temporal-eavt-key (di/-flush temporal-eavt backend)
:temporal-aevt-key (di/-flush temporal-aevt backend)
:temporal-avet-key (di/-flush temporal-avet backend)}))
{:sync? true})
(ds/release-store store-config store)
(when initial-tx
(let [conn (-connect config)]
Expand Down
12 changes: 4 additions & 8 deletions src/datahike/index/hitchhiker_tree.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,19 @@

(defn -insert [tree ^Datom datom index-type op-count]
(let [datom-as-vec (datom->node datom index-type)]
(async/<?? (hmsg/enqueue tree [(assoc (ins/new-InsertOp datom-as-vec op-count)
:tag (h/uuid))]))))
(async/<?? (hmsg/enqueue tree [(ins/new-InsertOp datom-as-vec op-count)]))))

(defn -temporal-insert [tree ^Datom datom index-type op-count]
(let [datom-as-vec (datom->node datom index-type)]
(async/<?? (hmsg/enqueue tree [(assoc (ins/new-temporal-InsertOp datom-as-vec op-count)
:tag (h/uuid))]))))
(async/<?? (hmsg/enqueue tree [(ins/new-temporal-InsertOp datom-as-vec op-count)]))))

(defn -upsert [tree ^Datom datom index-type op-count]
(let [datom-as-vec (datom->node datom index-type)]
(async/<?? (hmsg/enqueue tree [(assoc (ups/new-UpsertOp datom-as-vec op-count (index-type->indices index-type))
:tag (h/uuid))]))))
(async/<?? (hmsg/enqueue tree [(ups/new-UpsertOp datom-as-vec op-count (index-type->indices index-type))]))))

(defn -temporal-upsert [tree ^Datom datom index-type op-count]
(let [datom-as-vec (datom->node datom index-type)]
(async/<?? (hmsg/enqueue tree [(assoc (ups/new-temporal-UpsertOp datom-as-vec op-count (index-type->indices index-type))
:tag (h/uuid))]))))
(async/<?? (hmsg/enqueue tree [(ups/new-temporal-UpsertOp datom-as-vec op-count (index-type->indices index-type))]))))

(defn -remove [tree ^Datom datom index-type op-count]
(async/<?? (hmsg/delete tree (datom->node datom index-type) op-count)))
Expand Down
28 changes: 15 additions & 13 deletions src/datahike/index/hitchhiker_tree/upsert.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
This is equivalent to check whether the elements increase by one and contains 0.
E.g., [0 1 2] => true, [0 2 3] => false, [1 2] => false."
[indices]
(if (.contains indices 0)
(if (.contains ^clojure.lang.PersistentVector indices 0)
(let [m (apply max indices)
s (apply + indices)]
(= s (/ (* m (+ 1 m)) 2)))
Expand All @@ -34,12 +34,12 @@
(diu/equals-on-indices? new (-> candidates first first) indices))
(let [res (->> candidates
(map first)
;; Returns the key which has not been retracted.
;; There will at most be one such key.
;; Because of the ordering in keys, we know that
;; when two successive keys have a positive
;; :t value, then the second key is our answer,
;; the one that has not been retracted."
;; Returns the key which has not been retracted.
;; There will at most be one such key.
;; Because of the ordering in keys, we know that
;; when two successive keys have a positive
;; :t value, then the second key is our answer,
;; the one that has not been retracted."
(reduce (fn [prev-pos? k]
(let [curr-pos? (pos? (nth k 3))]
(if (and curr-pos?
Expand All @@ -56,7 +56,7 @@
(when-let [old (old-key kvs new indices)]
(remove-fn old)))

(defrecord UpsertOp [key op-count indices]
(defrecord UpsertOp [key op-count indices version]
op/IOperation
(-insertion-ts [_] op-count)
(-affects-key [_] key)
Expand All @@ -79,7 +79,7 @@
;; '-' means it is retracted and 'nt' is the current transaction time.
[a b c (- nt)]))

(defrecord temporal-UpsertOp [key op-count indices]
(defrecord temporal-UpsertOp [key op-count indices version]
op/IOperation
(-insertion-ts [_] op-count)
(-affects-key [_] key)
Expand All @@ -105,10 +105,10 @@
(tree/insert tree key nil)))))

(defn new-UpsertOp [key op-count indices]
(UpsertOp. key op-count indices))
(UpsertOp. key op-count indices 0))

(defn new-temporal-UpsertOp [key op-count indices]
(temporal-UpsertOp. key op-count indices))
(temporal-UpsertOp. key op-count indices 0))

(defn add-upsert-handler
"Tells the store how to deserialize upsert related operations"
Expand All @@ -118,9 +118,11 @@
{'datahike.index.hitchhiker_tree.upsert.UpsertOp
;; TODO Remove ts when Wanderung is available.
(fn [{:keys [key value op-count ts indices]}]
(map->UpsertOp {:key key :value value :op-count (or op-count ts) :indices (or indices [0 1])}))
(map->UpsertOp {:key key :value value :op-count (or op-count ts) :indices (or indices [0 1])
:version 0}))

'datahike.index.hitchhiker_tree.upsert.temporal_UpsertOp
(fn [{:keys [key value op-count ts indices]}]
(map->temporal-UpsertOp {:key key :value value :op-count (or op-count ts) :indices (or indices [0 1])}))})
(map->temporal-UpsertOp {:key key :value value :op-count (or op-count ts) :indices (or indices [0 1])
:version 0}))})
store)
7 changes: 3 additions & 4 deletions src/datahike/store.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
[clojure.spec.alpha :as s]
[konserve.filestore :as fs]
[konserve.memory :as mem]
[superv.async :refer [<?? S]]
[environ.core :refer [env]]
[datahike.index.hitchhiker-tree.upsert :as ups]))

Expand Down Expand Up @@ -69,7 +68,7 @@
(defmethod empty-store :mem [{:keys [id]}]
(if-let [store (get @memory id)]
store
(let [store (<?? S (mem/new-mem-store))]
(let [store (mem/new-mem-store (atom {}) {:sync? true})]
(swap! memory assoc id store)
store)))

Expand Down Expand Up @@ -99,13 +98,13 @@
(defmethod empty-store :file [{:keys [path]}]
(ups/add-upsert-handler
(kons/add-hitchhiker-tree-handlers
(<?? S (fs/new-fs-store path)))))
(fs/connect-fs-store path :opts {:sync? true}))))

(defmethod delete-store :file [{:keys [path]}]
(fs/delete-store path))

(defmethod connect-store :file [{:keys [path]}]
(<?? S (fs/new-fs-store path)))
(fs/connect-fs-store path :opts {:sync? true}))

(defmethod scheme->index :file [_]
:datahike.index/hitchhiker-tree)
Expand Down

0 comments on commit 024a18f

Please sign in to comment.