diff --git a/cassandra/src/cassandra/core.clj b/cassandra/src/cassandra/core.clj index baf6dda..fbff3a3 100644 --- a/cassandra/src/cassandra/core.clj +++ b/cassandra/src/cassandra/core.clj @@ -288,12 +288,6 @@ (clause/with {:compaction {:class compaction-strategy}})))) -(defn open-cassandra - [test] - (let [cluster (alia/cluster {:contact-points (:nodes test)}) - session (alia/connect cluster)] - [cluster session])) - (defn close-cassandra [cluster session] (some-> session alia/shutdown (.get 10 TimeUnit/SECONDS)) diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index dd19343..b711893 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -1,7 +1,5 @@ (ns scalardb.core - (:require [cassandra.core :as cassandra] - [cheshire.core :as cheshire] - [clojure.string :as string] + (:require [cheshire.core :as cheshire] [clojure.tools.logging :refer [info warn]] [jepsen.checker :as checker] [jepsen.independent :as independent] @@ -25,84 +23,28 @@ [r] (Thread/sleep (reduce * 1000 (repeat r 2)))) -(defn- get-cassandra-schema - "Only the current test schemata are covered - because this is just a workaround for the schema loader issue." - [schema] - (assert (= (count schema) 1) "The schema should have only 1 entry") - (let [keyspace-table (-> schema keys first name) - schema (-> schema vals first) - [keyspace table] (string/split keyspace-table #"\.") - partition-key (mapv keyword (:partition-key schema)) - clustering-key (mapv keyword (:clustering-key schema)) - columns (assoc (reduce - (fn [r [k t]] - (let [val-type (-> t string/lower-case keyword) - result (assoc r k val-type)] - (if (or (.contains partition-key k) - (.contains clustering-key k)) - result - (assoc result - (->> k name (str "before_") keyword) - val-type)))) - {} - (:columns schema)) - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key (into partition-key clustering-key))] - {:keyspace keyspace - :table table - :schema columns})) - -(defn- setup-cassandra-tables - [test schemata] - (let [[cluster session] (cassandra/open-cassandra test) - schemata (map get-cassandra-schema schemata)] - (doseq [schema schemata] - (cassandra/create-my-keyspace session test schema) - (cassandra/create-my-table session schema)) - (cassandra/create-my-keyspace session test {:keyspace "coordinator"}) - (cassandra/create-my-table session {:keyspace "coordinator" - :table "state" - :schema {:tx_id :text - :tx_state :int - :tx_created_at :bigint - :primary-key [:tx_id]}}) - (cassandra/close-cassandra cluster session))) - (defn setup-transaction-tables [test schemata] (let [properties (ext/create-properties (:db test) test) options (ext/create-table-opts (:db test) test)] - (if (= (.getProperty properties "scalar.db.username") "cassandra") - ;; Workaround the issue of the schema loader for Cassandra - (setup-cassandra-tables test schemata) - (doseq [schema (map cheshire/generate-string schemata)] - (loop [retries RETRIES] - (when (zero? retries) - (throw (ex-info "Failed to set up tables" {:schema schema}))) - (when (< retries RETRIES) - (exponential-backoff (- RETRIES retries)) - (try - (SchemaLoader/unload properties schema true) - (catch Exception e (warn (.getMessage e)))) - (exponential-backoff (- RETRIES retries))) - (let [result (try - (SchemaLoader/load properties schema options true) - :success - (catch Exception e - (warn (.getMessage e)) - :fail))] - (when (= result :fail) - (recur (dec retries))))))))) + (doseq [schema (map cheshire/generate-string schemata)] + (loop [retries RETRIES] + (when (zero? retries) + (throw (ex-info "Failed to set up tables" {:schema schema}))) + (when (< retries RETRIES) + (exponential-backoff (- RETRIES retries)) + (try + (SchemaLoader/unload properties schema true) + (catch Exception e (warn (.getMessage e)))) + (exponential-backoff (- RETRIES retries))) + (let [result (try + (SchemaLoader/load properties schema options true) + :success + (catch Exception e + (warn (.getMessage e)) + :fail))] + (when (= result :fail) + (recur (dec retries)))))))) (defn- close-storage! [test] diff --git a/scalardl/test/scalardl/cas_test.clj b/scalardl/test/scalardl/cas_test.clj index 2f47bf5..f9075f6 100644 --- a/scalardl/test/scalardl/cas_test.clj +++ b/scalardl/test/scalardl/cas_test.clj @@ -8,7 +8,8 @@ (:import (com.scalar.dl.client.exception ClientException) (com.scalar.dl.client.service ClientService) (com.scalar.dl.ledger.model ContractExecutionResult) - (com.scalar.dl.ledger.service StatusCode))) + (com.scalar.dl.ledger.service StatusCode) + (javax.json Json))) (def ^:dynamic contract-count (atom 0)) (def ^:dynamic execute-count (atom 0)) @@ -21,8 +22,9 @@ nil) (executeContract [& _] (swap! execute-count inc) - (ContractExecutionResult. "{\"value\": 3}" - nil + (ContractExecutionResult. (-> (Json/createObjectBuilder) + (.add "value" 3) + .build) nil nil)))) diff --git a/scalardl/test/scalardl/transfer_test.clj b/scalardl/test/scalardl/transfer_test.clj index bf695c2..d0e4f31 100644 --- a/scalardl/test/scalardl/transfer_test.clj +++ b/scalardl/test/scalardl/transfer_test.clj @@ -9,7 +9,8 @@ (:import (com.scalar.dl.client.exception ClientException) (com.scalar.dl.client.service ClientService) (com.scalar.dl.ledger.model ContractExecutionResult) - (com.scalar.dl.ledger.service StatusCode))) + (com.scalar.dl.ledger.service StatusCode) + (javax.json Json))) (def ^:dynamic contract-count (atom 0)) (def ^:dynamic execute-count (atom 0)) @@ -22,8 +23,10 @@ nil) (executeContract [& _] (swap! execute-count inc) - (ContractExecutionResult. "{\"balance\": 1000, \"age\": 111}" - nil + (ContractExecutionResult. (-> (Json/createObjectBuilder) + (.add "balance" 1000) + (.add "age" 111) + .build) nil nil))))