diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index bc00d67..ac20329 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -126,27 +126,27 @@ (defn- read-record "Read and update the specified record with a transaction" - [test i] + [test id] (let [tx (scalar/start-transaction test) - tx-result (.get tx (prepare-get i)) + tx-result (.get tx (prepare-get id)) ;; Need Storage API to read the transaction metadata - result (.get @(:storage test) (prepare-get i))] + result (.get @(:storage test) (prepare-get id))] ;; Put the same balance to check conflicts with in-flight transactions (->> (calc-new-balance tx-result 0) - (prepare-put i) + (prepare-put id) (.put tx)) (.commit tx) result)) (defn- read-record-with-retry - [test i] + [test id] (scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test (try - (read-record test i) + (read-record test id) (catch Exception e ;; Read failure or the transaction conflicted (warn (.getMessage e)) diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index c325600..198b969 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -128,31 +128,32 @@ :start-fail)) (defn- scan-records - "Scan records and append a new record with a transaction. - Return nil if the read fails or a conflict happens." + "Scan records and append a new record with a transaction" [test id] - (try - (let [tx (scalar/start-transaction test) - results (.scan tx (prepare-scan id))] - ;; Put the same balance to check conflicts with in-flight transactions - (->> (prepare-put id - (-> results first calc-new-age) - (-> results first (calc-new-balance 0))) - (.put tx)) - (.commit tx) - results) - (catch Exception e - ;; Read failure or the transaction conflicted - (warn (.getMessage e)) - nil))) + (let [tx (scalar/start-transaction test) + results (.scan tx (prepare-scan id))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (prepare-put id + (-> results first calc-new-age) + (-> results first (calc-new-balance 0))) + (.put tx)) + (.commit tx) + results)) + +(defn- scan-records-with-retry + [test id] + (scalar/with-retry scalar/prepare-transaction-service! test + (try + (scan-records test id) + (catch Exception e + ;; Scan failure or the transaction conflicted + (warn (.getMessage e)) + nil)))) (defn scan-all-records-with-retry [test n] (scalar/check-transaction-connection! test) - (scalar/with-retry scalar/prepare-transaction-service! test - (let [results (pmap #(scan-records test %) (range n))] - (when (every? #(some? %) results) - results)))) + (doall (map #(scan-records-with-retry test %) (range n)))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client