diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index 0fde6db..03b9819 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -130,8 +130,14 @@ "Read a record with a transaction. If read fails, this function returns nil." [tx storage i] (try - (.get tx (prepare-get i)) - (.get storage (prepare-get i)) + ;; Need Storage API to read the transaction metadata + (let [tx-result (.get tx (prepare-get i)) + result (.get storage (prepare-get i))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (calc-new-balance tx-result 0) + (prepare-put i) + (.put tx)) + result) (catch CrudException _ nil) (catch ExecutionException _ nil))) @@ -145,8 +151,14 @@ (scalar/prepare-storage-service! test)) test (let [tx (scalar/start-transaction test) - results (map #(read-record tx @(:storage test) %) (range n))] - (if (some nil? results) nil results)))) + results (doall (map #(read-record tx @(:storage test) %) (range n)))] + (try + (.commit tx) + (if (some nil? results) nil results) + (catch Exception e + ;; The transaction conflicted + (warn (.getMessage e)) + nil))))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index 6b22ae0..4de723d 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -131,7 +131,13 @@ (defn- scan-records [tx id] (try - (.scan tx (prepare-scan id)) + (let [results (.scan tx (prepare-scan id))] + ;; Put the same balance to check conflicts with in-flight transactions + (->> (prepare-put id + (-> results first calc-new-age) + (-> results first (calc-new-balance 0))) + (.put tx)) + results) (catch CrudException _ nil))) (defn scan-all-records-with-retry @@ -139,8 +145,14 @@ (scalar/check-transaction-connection! test) (scalar/with-retry scalar/prepare-transaction-service! test (let [tx (scalar/start-transaction test) - results (map #(scan-records tx %) (range n))] - (if (some nil? results) nil results)))) + results (doall (map #(scan-records tx %) (range n)))] + (try + (.commit tx) + (if (some nil? results) nil results) + (catch Exception e + ;; The transaction conflicted + (warn (.getMessage e)) + nil))))) (defrecord TransferClient [initialized? n initial-balance max-txs] client/Client