Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround Cassandra table setup issue #122

Merged
merged 3 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cassandra/src/cassandra/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@
(clause/with {:compaction
{:class compaction-strategy}}))))

(defn open-cassandra
[test]
(let [cluster (alia/cluster {:contact-points (:nodes test)})
session (alia/connect cluster)]
[cluster session]))

(defn close-cassandra
[cluster session]
(some-> session alia/shutdown (.get 10 TimeUnit/SECONDS))
Expand Down
96 changes: 77 additions & 19 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(ns scalardb.core
(:require [cheshire.core :as cheshire]
(:require [cassandra.core :as cassandra]
[cheshire.core :as cheshire]
[clojure.string :as string]
[clojure.tools.logging :refer [info warn]]
[jepsen.checker :as checker]
[jepsen.independent :as independent]
Expand All @@ -23,28 +25,84 @@
[r]
(Thread/sleep (reduce * 1000 (repeat r 2))))

(defn- get-cassandra-schema
"Only the current test schemata are covered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Only the current test schemata are covered
"Only the current test schemas are covered

because this is just a workaround for the schema loader issue."
[schema]
(assert (= (count schema) 1) "The schema should have only 1 entry")
(let [keyspace-table (-> schema keys first name)
schema (-> schema vals first)
[keyspace table] (string/split keyspace-table #"\.")
partition-key (mapv keyword (:partition-key schema))
clustering-key (mapv keyword (:clustering-key schema))
columns (assoc (reduce
(fn [r [k t]]
(let [val-type (-> t string/lower-case keyword)
result (assoc r k val-type)]
(if (or (.contains partition-key k)
(.contains clustering-key k))
result
(assoc result
(->> k name (str "before_") keyword)
val-type))))
{}
(:columns schema))
:tx_id :text
:tx_version :int
:tx_state :int
:tx_prepared_at :bigint
:tx_committed_at :bigint
:before_tx_id :text
:before_tx_version :int
:before_tx_state :int
:before_tx_prepared_at :bigint
:before_tx_committed_at :bigint
:primary-key (into partition-key clustering-key))]
{:keyspace keyspace
:table table
:schema columns}))

(defn- setup-cassandra-tables
[test schemata]
(let [[cluster session] (cassandra/open-cassandra test)
schemata (map get-cassandra-schema schemata)]
(doseq [schema schemata]
(cassandra/create-my-keyspace session test schema)
(cassandra/create-my-table session schema))
(cassandra/create-my-keyspace session test {:keyspace "coordinator"})
(cassandra/create-my-table session {:keyspace "coordinator"
:table "state"
:schema {:tx_id :text
:tx_state :int
:tx_created_at :bigint
:primary-key [:tx_id]}})
(cassandra/close-cassandra cluster session)))

(defn setup-transaction-tables
[test schemata]
(let [properties (ext/create-properties (:db test) test)
options (ext/create-table-opts (:db test) test)]
(doseq [schema (map cheshire/generate-string schemata)]
(loop [retries RETRIES]
(when (zero? retries)
(throw (ex-info "Failed to set up tables" {:schema schema})))
(when (< retries RETRIES)
(exponential-backoff (- RETRIES retries))
(try
(SchemaLoader/unload properties schema true)
(catch Exception e (warn (.getMessage e))))
(exponential-backoff (- RETRIES retries)))
(let [result (try
(SchemaLoader/load properties schema options true)
:success
(catch Exception e
(warn (.getMessage e))
:fail))]
(when (= result :fail)
(recur (dec retries))))))))
(if (= (.getProperty properties "scalar.db.username") "cassandra")
;; Workaround the issue of the schema loader for Cassandra
(setup-cassandra-tables test schemata)
(doseq [schema (map cheshire/generate-string schemata)]
(loop [retries RETRIES]
(when (zero? retries)
(throw (ex-info "Failed to set up tables" {:schema schema})))
(when (< retries RETRIES)
(exponential-backoff (- RETRIES retries))
(try
(SchemaLoader/unload properties schema true)
(catch Exception e (warn (.getMessage e))))
(exponential-backoff (- RETRIES retries)))
(let [result (try
(SchemaLoader/load properties schema options true)
:success
(catch Exception e
(warn (.getMessage e))
:fail))]
(when (= result :fail)
(recur (dec retries)))))))))

(defn- close-storage!
[test]
Expand Down
8 changes: 3 additions & 5 deletions scalardl/test/scalardl/cas_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
(:import (com.scalar.dl.client.exception ClientException)
(com.scalar.dl.client.service ClientService)
(com.scalar.dl.ledger.model ContractExecutionResult)
(com.scalar.dl.ledger.service StatusCode)
(javax.json Json)))
(com.scalar.dl.ledger.service StatusCode)))

(def ^:dynamic contract-count (atom 0))
(def ^:dynamic execute-count (atom 0))
Expand All @@ -22,9 +21,8 @@
nil)
(executeContract [& _]
(swap! execute-count inc)
(ContractExecutionResult. (-> (Json/createObjectBuilder)
(.add "value" 3)
.build)
(ContractExecutionResult. "{\"value\": 3}"
nil
nil
nil))))

Expand Down
9 changes: 3 additions & 6 deletions scalardl/test/scalardl/transfer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
(:import (com.scalar.dl.client.exception ClientException)
(com.scalar.dl.client.service ClientService)
(com.scalar.dl.ledger.model ContractExecutionResult)
(com.scalar.dl.ledger.service StatusCode)
(javax.json Json)))
(com.scalar.dl.ledger.service StatusCode)))

(def ^:dynamic contract-count (atom 0))
(def ^:dynamic execute-count (atom 0))
Expand All @@ -23,10 +22,8 @@
nil)
(executeContract [& _]
(swap! execute-count inc)
(ContractExecutionResult. (-> (Json/createObjectBuilder)
(.add "balance" 1000)
(.add "age" 111)
.build)
(ContractExecutionResult. "{\"balance\": 1000, \"age\": 111}"
nil
nil
nil))))

Expand Down
Loading