From d98c7034ea53721ee23e4a6b7628020b0a084f66 Mon Sep 17 00:00:00 2001 From: yito88 Date: Sun, 10 Dec 2023 01:41:14 +0100 Subject: [PATCH] fix nodes --- cassandra/src/cassandra/nemesis.clj | 4 ++-- cassandra/src/cassandra/runner.clj | 2 +- cassandra/test/cassandra/runner_test.clj | 4 ++-- scalardb/src/scalardb/db/postgres.clj | 24 ++++++++++++++++++++ scalardb/src/scalardb/db_extend.clj | 15 ++++++------- scalardb/src/scalardb/runner.clj | 28 ++++++++++-------------- 6 files changed, 47 insertions(+), 30 deletions(-) diff --git a/cassandra/src/cassandra/nemesis.clj b/cassandra/src/cassandra/nemesis.clj index 65a2b91..4f16d1a 100644 --- a/cassandra/src/cassandra/nemesis.clj +++ b/cassandra/src/cassandra/nemesis.clj @@ -199,7 +199,7 @@ (defn flush-generator [opts] - (when (contains? (:admin opts) :flush-compact) + (when (contains? (:admin opts) :flush) (->> (gen/mix [(repeat {:type :info, :f :flush}) (repeat {:type :info, :f :compact})]) (gen/stagger default-interval)))) @@ -207,7 +207,7 @@ (defn flush-package "A combined nemesis package for flush and compaction." [opts] - (when (contains? (:admin opts) :flush-compact) + (when (contains? (:admin opts) :flush) {:nemesis (flush-nemesis) :generator (flush-generator opts) :perf #{{:name "flush" diff --git a/cassandra/src/cassandra/runner.clj b/cassandra/src/cassandra/runner.clj index 8d224c5..6f50f2d 100644 --- a/cassandra/src/cassandra/runner.clj +++ b/cassandra/src/cassandra/runner.clj @@ -42,7 +42,7 @@ (def admin {"none" [] "join" [:join] - "flush" [:flush-compact]}) + "flush" [:flush]}) (def test-opt-spec [(cli/repeated-opt nil "--workload NAME" "Test(s) to run" [] workload-keys)]) diff --git a/cassandra/test/cassandra/runner_test.clj b/cassandra/test/cassandra/runner_test.clj index d485e19..5248c2d 100644 --- a/cassandra/test/cassandra/runner_test.clj +++ b/cassandra/test/cassandra/runner_test.clj @@ -6,7 +6,7 @@ (let [opts {:target "cassandra" :workload :batch :nemesis [:crash] - :admin [:flush-compact] + :admin [:flush] :time-limit 60} test (runner/cassandra-test opts)] - (is (= "cassandra-batch-crash-flush-compact" (:name test))))) + (is (= "cassandra-batch-crash-flush" (:name test))))) diff --git a/scalardb/src/scalardb/db/postgres.clj b/scalardb/src/scalardb/db/postgres.clj index cc1f731..3823478 100644 --- a/scalardb/src/scalardb/db/postgres.clj +++ b/scalardb/src/scalardb/db/postgres.clj @@ -8,6 +8,8 @@ [jepsen.os.debian :as debian])) (def ^:private ^:const DEFAULT_VERSION "15") +(def ^:private ^:const TIMEOUT_SEC 600) +(def ^:private ^:const INTERVAL_SEC 10) (defn- install! "Installs PostgreSQL." @@ -69,6 +71,28 @@ (c/su (meh (c/exec :rm :-r (get-main-dir version)))) (c/su (meh (c/exec :rm (get-log-path version))))) +(defn live-node? + [test] + (let [node (-> test :nodes first)] + (try + (c/on node (c/sudo "postgres" (c/exec :pg_isready))) + true + (catch Exception _ + (info node "is down") + false)))) + +(defn wait-for-recovery + "Wait for the node bootstrapping." + ([test] + (wait-for-recovery TIMEOUT_SEC INTERVAL_SEC test)) + ([timeout-sec interval-sec test] + (when-not (live-node? test) + (Thread/sleep (* interval-sec 1000)) + (if (>= timeout-sec interval-sec) + (wait-for-recovery (- timeout-sec interval-sec) interval-sec test) + (throw (ex-info "Timed out waiting for the postgres node" + {:cause "The node couldn't start"})))))) + (defn db "Setup PostgreSQL." [] diff --git a/scalardb/src/scalardb/db_extend.clj b/scalardb/src/scalardb/db_extend.clj index 0eb64f2..56a4ed0 100644 --- a/scalardb/src/scalardb/db_extend.clj +++ b/scalardb/src/scalardb/db_extend.clj @@ -1,7 +1,8 @@ (ns scalardb.db-extend (:require [cassandra.core :as cassandra] [clojure.string :as string] - [jepsen.db :as db]) + [jepsen.db :as db] + [scalardb.db.postgres :as postgres]) (:import (com.scalar.db.storage.cassandra CassandraAdmin CassandraAdmin$ReplicationStrategy CassandraAdmin$CompactionStrategy) @@ -47,15 +48,13 @@ (defrecord ExtPostgres [] DbExtension - ;; TODO: check the liveness - (live-nodes [_ test] (:nodes test)) - (wait-for-recovery [_ _]) + (live-nodes [_ test] (postgres/live-node? test)) + (wait-for-recovery [_ test] (postgres/wait-for-recovery test)) (create-table-opts [_ _] {}) (create-properties - [this test] - (let [node (first (live-nodes this test))] - (when (nil? node) - (throw (ex-info "No living node" {:test test}))) + [_ test] + (let [node (-> test :nodes first)] + ;; We have only one node in this test (doto (Properties.) (.setProperty "scalar.db.storage" "jdbc") (.setProperty "scalar.db.contact_points" diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index 5168eb4..5be70c4 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -34,7 +34,13 @@ "Returns [extended-db constructed-nemesis num-max-nodes]." [db-key faults admin] (case db-key - :cassandra (let [db (extend-db (cassandra/db) :cassandra)] + :cassandra (let [db (extend-db (cassandra/db) :cassandra) + ;; replace :kill nemesis with :crash for Cassandra + faults (mapv #(if (= % :kill) :crash %) faults)] + (when-not (every? #(some? (get cr/nemeses (name %))) faults) + (throw + (ex-info + (str "Invalid nemesis for Cassandra: " faults) {}))) [db (cn/nemesis-package {:db db @@ -47,6 +53,8 @@ :minority-third]}}) Integer/MAX_VALUE]) :postgres (let [db (extend-db (postgres/db) :postgres)] + (when (seq admin) + (warn "The admin operations are ignored:" admin)) [db (jn/nemesis-package {:db db @@ -87,8 +95,7 @@ "packet" [:packet] "clock" [:clock] "crash" [:kill] - "pause" [:pause] - "mix" [:kill :partition :clock]}) + "pause" [:pause]}) (def test-opt-spec [(cli/repeated-opt nil "--db NAME" "DB(s) on which the test is run" @@ -124,18 +131,6 @@ (into (map name admin)) (->> (remove nil?) (string/join "-")))) -(defn- validate-options - "Check if the given faults are valid for the tested DB." - [db nemesis admin] - (when (= db :cassandra) - (when-not (every? #(some? (% cr/nemeses)) nemesis) - (throw (ex-info (str "Invalid nemesis for Cassandra: " nemesis) {}))) - (when-not (every? #(some? (% cr/admin)) admin) - (throw (ex-info (str "Invalid admin for Cassandra: " admin) {})))) - (when (not= db :cassandra) - (when (seq admin) - (warn "The admin operations are ignored:" admin)))) - (def ^:private scalardb-opts {:storage (atom nil) :transaction (atom nil) @@ -151,7 +146,7 @@ consistency-model (->> base-opts :consistency-model (mapv keyword)) workload-opts (merge base-opts scalardb-opts - {:nodes (take max-nodes (:nodes base-opts)) + {:nodes (vec (take max-nodes (:nodes base-opts))) :consistency-model consistency-model}) workload ((workload-key workloads) workload-opts)] (merge tests/noop-test @@ -186,7 +181,6 @@ workload-key (:workload options) faults (:nemesis options) admin (:admin options)] - (validate-options db-key faults admin) (let [test (-> options (scalardb-test db-key workload-key