diff --git a/cassandra/src/cassandra/core.clj b/cassandra/src/cassandra/core.clj index fbff3a3..baf6dda 100644 --- a/cassandra/src/cassandra/core.clj +++ b/cassandra/src/cassandra/core.clj @@ -288,6 +288,12 @@ (clause/with {:compaction {:class compaction-strategy}})))) +(defn open-cassandra + [test] + (let [cluster (alia/cluster {:contact-points (:nodes test)}) + session (alia/connect cluster)] + [cluster session])) + (defn close-cassandra [cluster session] (some-> session alia/shutdown (.get 10 TimeUnit/SECONDS)) diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index b711893..dd19343 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -1,5 +1,7 @@ (ns scalardb.core - (:require [cheshire.core :as cheshire] + (:require [cassandra.core :as cassandra] + [cheshire.core :as cheshire] + [clojure.string :as string] [clojure.tools.logging :refer [info warn]] [jepsen.checker :as checker] [jepsen.independent :as independent] @@ -23,28 +25,84 @@ [r] (Thread/sleep (reduce * 1000 (repeat r 2)))) +(defn- get-cassandra-schema + "Only the current test schemata are covered + because this is just a workaround for the schema loader issue." + [schema] + (assert (= (count schema) 1) "The schema should have only 1 entry") + (let [keyspace-table (-> schema keys first name) + schema (-> schema vals first) + [keyspace table] (string/split keyspace-table #"\.") + partition-key (mapv keyword (:partition-key schema)) + clustering-key (mapv keyword (:clustering-key schema)) + columns (assoc (reduce + (fn [r [k t]] + (let [val-type (-> t string/lower-case keyword) + result (assoc r k val-type)] + (if (or (.contains partition-key k) + (.contains clustering-key k)) + result + (assoc result + (->> k name (str "before_") keyword) + val-type)))) + {} + (:columns schema)) + :tx_id :text + :tx_version :int + :tx_state :int + :tx_prepared_at :bigint + :tx_committed_at :bigint + :before_tx_id :text + :before_tx_version :int + :before_tx_state :int + :before_tx_prepared_at :bigint + :before_tx_committed_at :bigint + :primary-key (into partition-key clustering-key))] + {:keyspace keyspace + :table table + :schema columns})) + +(defn- setup-cassandra-tables + [test schemata] + (let [[cluster session] (cassandra/open-cassandra test) + schemata (map get-cassandra-schema schemata)] + (doseq [schema schemata] + (cassandra/create-my-keyspace session test schema) + (cassandra/create-my-table session schema)) + (cassandra/create-my-keyspace session test {:keyspace "coordinator"}) + (cassandra/create-my-table session {:keyspace "coordinator" + :table "state" + :schema {:tx_id :text + :tx_state :int + :tx_created_at :bigint + :primary-key [:tx_id]}}) + (cassandra/close-cassandra cluster session))) + (defn setup-transaction-tables [test schemata] (let [properties (ext/create-properties (:db test) test) options (ext/create-table-opts (:db test) test)] - (doseq [schema (map cheshire/generate-string schemata)] - (loop [retries RETRIES] - (when (zero? retries) - (throw (ex-info "Failed to set up tables" {:schema schema}))) - (when (< retries RETRIES) - (exponential-backoff (- RETRIES retries)) - (try - (SchemaLoader/unload properties schema true) - (catch Exception e (warn (.getMessage e)))) - (exponential-backoff (- RETRIES retries))) - (let [result (try - (SchemaLoader/load properties schema options true) - :success - (catch Exception e - (warn (.getMessage e)) - :fail))] - (when (= result :fail) - (recur (dec retries)))))))) + (if (= (.getProperty properties "scalar.db.username") "cassandra") + ;; Workaround the issue of the schema loader for Cassandra + (setup-cassandra-tables test schemata) + (doseq [schema (map cheshire/generate-string schemata)] + (loop [retries RETRIES] + (when (zero? retries) + (throw (ex-info "Failed to set up tables" {:schema schema}))) + (when (< retries RETRIES) + (exponential-backoff (- RETRIES retries)) + (try + (SchemaLoader/unload properties schema true) + (catch Exception e (warn (.getMessage e)))) + (exponential-backoff (- RETRIES retries))) + (let [result (try + (SchemaLoader/load properties schema options true) + :success + (catch Exception e + (warn (.getMessage e)) + :fail))] + (when (= result :fail) + (recur (dec retries))))))))) (defn- close-storage! [test] diff --git a/scalardl/test/scalardl/cas_test.clj b/scalardl/test/scalardl/cas_test.clj index f9075f6..2f47bf5 100644 --- a/scalardl/test/scalardl/cas_test.clj +++ b/scalardl/test/scalardl/cas_test.clj @@ -8,8 +8,7 @@ (:import (com.scalar.dl.client.exception ClientException) (com.scalar.dl.client.service ClientService) (com.scalar.dl.ledger.model ContractExecutionResult) - (com.scalar.dl.ledger.service StatusCode) - (javax.json Json))) + (com.scalar.dl.ledger.service StatusCode))) (def ^:dynamic contract-count (atom 0)) (def ^:dynamic execute-count (atom 0)) @@ -22,9 +21,8 @@ nil) (executeContract [& _] (swap! execute-count inc) - (ContractExecutionResult. (-> (Json/createObjectBuilder) - (.add "value" 3) - .build) + (ContractExecutionResult. "{\"value\": 3}" + nil nil nil)))) diff --git a/scalardl/test/scalardl/transfer_test.clj b/scalardl/test/scalardl/transfer_test.clj index d0e4f31..bf695c2 100644 --- a/scalardl/test/scalardl/transfer_test.clj +++ b/scalardl/test/scalardl/transfer_test.clj @@ -9,8 +9,7 @@ (:import (com.scalar.dl.client.exception ClientException) (com.scalar.dl.client.service ClientService) (com.scalar.dl.ledger.model ContractExecutionResult) - (com.scalar.dl.ledger.service StatusCode) - (javax.json Json))) + (com.scalar.dl.ledger.service StatusCode))) (def ^:dynamic contract-count (atom 0)) (def ^:dynamic execute-count (atom 0)) @@ -23,10 +22,8 @@ nil) (executeContract [& _] (swap! execute-count inc) - (ContractExecutionResult. (-> (Json/createObjectBuilder) - (.add "balance" 1000) - (.add "age" 111) - .build) + (ContractExecutionResult. "{\"balance\": 1000, \"age\": 111}" + nil nil nil))))