Skip to content

Commit

Permalink
Request transfer transactions in parallel (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 authored Apr 22, 2024
1 parent a92c8b4 commit 06f9b5e
Show file tree
Hide file tree
Showing 18 changed files with 352 additions and 335 deletions.
12 changes: 3 additions & 9 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,10 @@
(when-not @(:transaction test)
(prepare-transaction-service! test)))

(defn try-reconnection-for-transaction!
[test]
(when (= (swap! (:failures test) inc) NUM_FAILURES_FOR_RECONNECTION)
(prepare-transaction-service! test)
(reset! (:failures test) 0)))

(defn try-reconnection-for-2pc!
[test]
(defn try-reconnection!
[test prepare-fn]
(when (= (swap! (:failures test) inc) NUM_FAILURES_FOR_RECONNECTION)
(prepare-2pc-service! test)
(prepare-fn test)
(reset! (:failures test) 0)))

(defn start-transaction
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
(swap! (:unknown-tx test) conj (.getId tx))
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
(catch Exception e
(scalar/try-reconnection-for-transaction! test)
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_append_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(scalar/rollback-txs [tx1 tx2])
(scalar/try-reconnection-for-2pc! test)
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_write_read.clj
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
(swap! (:unknown-tx test) conj (.getId tx))
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
(catch Exception e
(scalar/try-reconnection-for-transaction! test)
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_write_read_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(scalar/rollback-txs [tx1 tx2])
(scalar/try-reconnection-for-2pc! test)
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
128 changes: 81 additions & 47 deletions scalardb/src/scalardb/transfer.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns scalardb.transfer
(:require [clojure.core.reducers :as r]
[clojure.tools.logging :refer [warn]]
[jepsen
[client :as client]
[checker :as checker]
Expand All @@ -23,7 +24,7 @@

(def ^:const INITIAL_BALANCE 10000)
(def ^:const NUM_ACCOUNTS 10)
(def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE))
(def ^:const MAX_NUM_TXS 8)

(def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE))
{:transaction true
Expand Down Expand Up @@ -86,7 +87,7 @@
(-> r get-balance (+ amount)))

(defn- tx-transfer
[tx {:keys [from to amount]}]
[tx from to amount]
(let [fromResult (.get tx (prepare-get from))
toResult (.get tx (prepare-get to))]
(->> (calc-new-balance fromResult (- amount))
Expand All @@ -97,6 +98,34 @@
(.put tx))
(.commit tx)))

(defn- try-tx-transfer
[test {:keys [from to amount]}]
(if-let [tx (scalar/start-transaction test)]
(try
(tx-transfer tx from to amount)
:commit
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx))
(warn "Unknown transaction: " (.getId tx))
:unknown-tx-status)
(catch Exception e
(warn (.getMessage e))
:fail))
:start-fail))

(defn exec-transfers
"Execute transfers in parallel. Give the transfer function."
[test op transfer-fn]
(let [results (pmap #(transfer-fn test %) (:value op))]
(if (some #{:commit} results)
;; return :ok when at least 1 transaction is committed
(assoc op :type :ok :value {:results results})
;; :info type could be better in some cases
;; However, our checker doesn't care about the type for now
(do
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
(assoc op :type :fail :error {:results results})))))

(defn- read-record
"Read a record with a transaction. If read fails, this function returns nil."
[tx storage i]
Expand All @@ -110,15 +139,19 @@
[test n]
(scalar/check-transaction-connection! test)
(scalar/check-storage-connection! test)
(scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test
(scalar/with-retry
(fn [test]
(scalar/prepare-transaction-service! test)
(scalar/prepare-storage-service! test))
test
(let [tx (scalar/start-transaction test)
results (map #(read-record tx @(:storage test) %) (range n))]
(if (some nil? results) nil results))))

(defrecord TransferClient [initialized? n initial-balance]
(defrecord TransferClient [initialized? n initial-balance max-txs]
client/Client
(open! [_ _ _]
(TransferClient. initialized? n initial-balance))
(TransferClient. initialized? n initial-balance max-txs))

(setup! [_ test]
(locking initialized?
Expand All @@ -129,22 +162,10 @@

(invoke! [_ test op]
(case (:f op)
:transfer (if-let [tx (scalar/start-transaction test)]
(try
(tx-transfer tx (:value op))
(assoc op :type :ok)
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx))
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
(catch Exception e
(scalar/try-reconnection-for-transaction! test)
(assoc op :type :fail :error (.getMessage e))))
(do
(scalar/try-reconnection-for-transaction! test)
(assoc op :type :fail :error "Skipped due to no connection")))
:transfer (exec-transfers test op try-tx-transfer)
:get-all (do
(wait-for-recovery (:db test) test)
(if-let [results (read-all-with-retry test (:num op))]
(if-let [results (read-all-with-retry test n)]
(assoc op :type :ok :value {:balance (get-balances results)
:version (get-versions results)})
(assoc op :type :fail :error "Failed to get balances")))
Expand All @@ -159,25 +180,30 @@
(teardown! [_ test]
(scalar/close-all! test)))

(defn- transfer
(defn- generate-acc-pair
[n]
(loop []
(let [from (rand-int n)
to (rand-int n)]
(if-not (= from to) [from to] (recur)))))

(defn transfer
[test _]
(let [n (-> test :client :n)]
(let [num-accs (-> test :client :n)
num-txs (-> test :client :max-txs rand-int inc)]
{:type :invoke
:f :transfer
:value {:from (rand-int n)
:to (rand-int n)
:amount (+ 1 (rand-int 1000))}}))

(def diff-transfer
(gen/filter (fn [op] (not= (-> op :value :from)
(-> op :value :to)))
transfer))
:value (repeatedly num-txs
(fn []
(let [[from to] (generate-acc-pair num-accs)]
{:from from
:to to
:amount (+ 1 (rand-int 1000))})))}))

(defn get-all
[test _]
[_ _]
{:type :invoke
:f :get-all
:num (-> test :client :n)})
:f :get-all})

(defn check-tx
[_ _]
Expand All @@ -188,17 +214,20 @@
[]
(reify checker/Checker
(check [_ test history _]
(let [read-result (->> history
(let [num-accs (-> test :client :n)
initial-balance (-> test :client :initial-balance)
total-balance (* num-accs initial-balance)
read-result (->> history
(r/filter #(= :get-all (:f %)))
(r/filter identity)
(into [])
last
:value)
actual-balance (->> (:balance read-result)
(reduce +))
bad-balance (when-not (= actual-balance TOTAL_BALANCE)
bad-balance (when-not (= actual-balance total-balance)
{:type :wrong-balance
:expected TOTAL_BALANCE
:expected total-balance
:actual actual-balance})
actual-version (->> (:version read-result)
(reduce +))
Expand All @@ -209,16 +238,18 @@
last
((fn [x]
(if (= (:type x) :ok) (:value x) 0))))
total-ok (->> history
(r/filter op/ok?)
(r/filter #(= :transfer (:f %)))
(r/filter identity)
(into [])
count
(+ checked-committed))
expected-version (-> total-ok
(* 2) ; update 2 records per a transfer
(+ (-> test :client :n))) ; initial insertions
total-commits (->> history
(r/filter op/ok?)
(r/filter #(= :transfer (:f %)))
(r/reduce (fn [cnt op]
(->> op :value :results
(filter #{:commit})
count
(+ cnt)))
checked-committed))
expected-version (-> total-commits
(* 2) ; update 2 records per transfer
(+ num-accs)) ; initial insertions
bad-version (when-not (= actual-version expected-version)
{:type :wrong-version
:expected expected-version
Expand All @@ -231,8 +262,11 @@

(defn workload
[_]
{:client (->TransferClient (atom false) NUM_ACCOUNTS INITIAL_BALANCE)
:generator [diff-transfer]
{:client (->TransferClient (atom false)
NUM_ACCOUNTS
INITIAL_BALANCE
MAX_NUM_TXS)
:generator [transfer]
:final-generator (gen/phases
(gen/once get-all)
(gen/once check-tx))
Expand Down
45 changes: 25 additions & 20 deletions scalardb/src/scalardb/transfer_2pc.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns scalardb.transfer-2pc
(:require [jepsen
(:require [clojure.tools.logging :refer [warn]]
[jepsen
[client :as client]
[generator :as gen]]
[scalardb.core :as scalar]
Expand All @@ -8,7 +9,7 @@
(:import (com.scalar.db.exception.transaction UnknownTransactionStatusException)))

(defn- tx-transfer
[tx1 tx2 {:keys [from to amount]}]
[tx1 tx2 from to amount]
(try
(let [fromResult (.get tx1 (transfer/prepare-get from))
toResult (.get tx2 (transfer/prepare-get to))]
Expand All @@ -25,10 +26,25 @@
(scalar/rollback-txs [tx1 tx2])
(throw e))))

(defrecord TransferClient [initialized? n initial-balance]
(defn- try-tx-transfer
[test {:keys [from to amount]}]
(let [tx1 (scalar/start-2pc test)
tx2 (scalar/join-2pc test (.getId tx1))]
(try
(tx-transfer tx1 tx2 from to amount)
:commit
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx1))
(warn "Unknown transaction: " (.getId tx1))
:unknown-tx-status)
(catch Exception e
(warn (.getMessage e))
:fail))))

(defrecord TransferClient [initialized? n initial-balance max-txs]
client/Client
(open! [_ _ _]
(TransferClient. initialized? n initial-balance))
(TransferClient. initialized? n initial-balance max-txs))

(setup! [_ test]
(locking initialized?
Expand All @@ -40,22 +56,10 @@

(invoke! [_ test op]
(case (:f op)
:transfer (let [tx1 (scalar/start-2pc test)
tx2 (scalar/join-2pc test (.getId tx1))]
(try
(tx-transfer tx1 tx2 (:value op))
(assoc op :type :ok)
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx1))
(assoc op
:type :info
:error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(scalar/try-reconnection-for-2pc! test)
(assoc op :type :fail :error (.getMessage e)))))
:transfer (transfer/exec-transfers test op try-tx-transfer)
:get-all (do
(wait-for-recovery (:db test) test)
(if-let [results (transfer/read-all-with-retry test (:num op))]
(if-let [results (transfer/read-all-with-retry test n)]
(assoc op :type :ok :value {:balance
(transfer/get-balances results)
:version
Expand All @@ -76,8 +80,9 @@
[_]
{:client (->TransferClient (atom false)
transfer/NUM_ACCOUNTS
transfer/INITIAL_BALANCE)
:generator [transfer/diff-transfer]
transfer/INITIAL_BALANCE
transfer/MAX_NUM_TXS)
:generator [transfer/transfer]
:final-generator (gen/phases
(gen/once transfer/get-all)
(gen/once transfer/check-tx))
Expand Down
Loading

0 comments on commit 06f9b5e

Please sign in to comment.