diff --git a/Cargo.lock b/Cargo.lock index d3a602c27..fdbaa580a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3911,6 +3911,7 @@ dependencies = [ "strum", "strum_macros", "thiserror", + "utils", "workspace-hack", ] diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 534cf0daa..b5af7b57f 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -162,9 +162,8 @@ impl std::fmt::Debug for FilePipeline { #[cfg(test)] mod tests { - use crate::server::storage::wal::util::get_file_paths_with_ext; - use super::*; + use crate::server::storage::wal::util::get_file_paths_with_ext; #[tokio::test] async fn file_pipeline_is_ok() { diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 40b0f976a..525ef3aa5 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -12,14 +12,13 @@ use tokio::{ use tokio_stream::StreamExt; use tokio_util::codec::Framed; -use crate::log_entry::LogEntry; - use super::{ codec::{DataFrame, WAL}, error::{CorruptType, WALError}, util::{get_checksum, parse_u64, validate_data, LockedFile}, WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, }; +use crate::log_entry::LogEntry; /// The size of wal file header in bytes const WAL_HEADER_SIZE: usize = 56; @@ -415,9 +414,8 @@ mod tests { use curp_test_utils::test_cmd::TestCommand; - use crate::log_entry::EntryData; - use super::*; + use crate::log_entry::EntryData; #[tokio::test] async fn segment_state_transition_is_correct() { diff --git a/crates/curp/src/tracker.rs b/crates/curp/src/tracker.rs index 7759c41cc..09ecc18ec 100644 --- a/crates/curp/src/tracker.rs +++ b/crates/curp/src/tracker.rs @@ -1,7 +1,9 @@ #![allow(unused)] // TODO remove when used -use std::collections::VecDeque; -use std::ops::{AddAssign, Sub}; +use std::{ + collections::VecDeque, + ops::{AddAssign, Sub}, +}; use clippy_utilities::NumericCast; diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 46b1ebab0..140745946 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -201,6 +201,26 @@ pub mod tracing; use ::tracing::debug; pub use parser::*; +/// display all elements for the given vector +#[macro_export] +macro_rules! write_vec { + ($f:expr, $name:expr, $vector:expr) => { + write!($f, "{}: [ ", { $name })?; + let last_idx = if $vector.is_empty() { + 0 + } else { + $vector.len() - 1 + }; + for (idx, element) in $vector.iter().enumerate() { + write!($f, "{}", element)?; + if idx != last_idx { + write!($f, ",")?; + } + } + write!($f, "]")?; + }; +} + /// Get current timestamp in seconds #[must_use] #[inline] diff --git a/crates/xline/src/server/auth_server.rs b/crates/xline/src/server/auth_server.rs index 1a43249e5..e00b35e8b 100644 --- a/crates/xline/src/server/auth_server.rs +++ b/crates/xline/src/server/auth_server.rs @@ -142,8 +142,8 @@ where &self, mut request: tonic::Request, ) -> Result, tonic::Status> { - debug!("Receive AuthUserAddRequest {:?}", request); let user_add_req = request.get_mut(); + debug!("Receive AuthUserAddRequest {}", user_add_req); user_add_req.validation()?; let hashed_password = Self::hash_password(user_add_req.password.as_bytes()); user_add_req.hashed_password = hashed_password; @@ -244,7 +244,10 @@ where &self, request: tonic::Request, ) -> Result, tonic::Status> { - debug!("Receive AuthRoleGrantPermissionRequest {:?}", request); + debug!( + "Receive AuthRoleGrantPermissionRequest {}", + request.get_ref() + ); request.get_ref().validation()?; self.handle_req(request, false).await } @@ -253,7 +256,10 @@ where &self, request: tonic::Request, ) -> Result, tonic::Status> { - debug!("Receive AuthRoleRevokePermissionRequest {:?}", request); + debug!( + "Receive AuthRoleRevokePermissionRequest {}", + request.get_ref() + ); self.handle_req(request, false).await } } diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 725e5d2d2..b9d51060a 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -205,7 +205,7 @@ where ) -> Result, tonic::Status> { let range_req = request.get_ref(); range_req.validation()?; - debug!("Receive grpc request: {:?}", range_req); + debug!("Receive grpc request: {}", range_req); range_req.check_revision( self.kv_storage.compacted_revision(), self.kv_storage.revision(), @@ -244,7 +244,7 @@ where ) -> Result, tonic::Status> { let put_req: &PutRequest = request.get_ref(); put_req.validation()?; - debug!("Receive grpc request: {:?}", put_req); + debug!("Receive grpc request: {}", put_req); let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; let is_fast_path = true; let (cmd_res, sync_res) = self @@ -253,7 +253,7 @@ where let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); - debug!("Get revision {:?} for PutRequest", revision); + debug!("Get revision {} for PutRequest", revision); Self::update_header_revision(&mut res, revision); } if let Response::ResponsePut(response) = res { @@ -273,7 +273,7 @@ where ) -> Result, tonic::Status> { let delete_range_req = request.get_ref(); delete_range_req.validation()?; - debug!("Receive grpc request: {:?}", delete_range_req); + debug!("Receive grpc request: {}", delete_range_req); let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?; let is_fast_path = true; let (cmd_res, sync_res) = self @@ -282,7 +282,7 @@ where let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); - debug!("Get revision {:?} for DeleteRangeRequest", revision); + debug!("Get revision {} for DeleteRangeRequest", revision); Self::update_header_revision(&mut res, revision); } if let Response::ResponseDeleteRange(response) = res { @@ -303,7 +303,7 @@ where ) -> Result, tonic::Status> { let txn_req = request.get_ref(); txn_req.validation()?; - debug!("Receive grpc request: {:?}", txn_req); + debug!("Receive grpc request: {}", txn_req); txn_req.check_revision( self.kv_storage.compacted_revision(), self.kv_storage.revision(), @@ -326,7 +326,7 @@ where let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); - debug!("Get revision {:?} for TxnRequest", revision); + debug!("Get revision {} for TxnRequest", revision); Self::update_header_revision(&mut res, revision); } res diff --git a/crates/xline/src/server/mod.rs b/crates/xline/src/server/mod.rs index cb52839fd..f6c88947c 100644 --- a/crates/xline/src/server/mod.rs +++ b/crates/xline/src/server/mod.rs @@ -21,6 +21,5 @@ mod watch_server; /// Xline server mod xline_server; -pub(crate) use self::auth_server::get_token; -pub(crate) use self::maintenance::MAINTENANCE_SNAPSHOT_CHUNK_SIZE; pub use self::xline_server::XlineServer; +pub(crate) use self::{auth_server::get_token, maintenance::MAINTENANCE_SNAPSHOT_CHUNK_SIZE}; diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 174ee0a9e..dc008019f 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -8,18 +8,17 @@ use std::{ time::Duration, }; -use clippy_utilities::OverflowArithmetic; use itertools::Itertools; -use log::warn; use parking_lot::RwLock; use tokio::{ sync::mpsc::{self, error::TrySendError}, time::sleep, }; -use tracing::debug; +use tracing::{debug, warn}; use utils::{ parking_lot_lock::RwLockMap, task_manager::{tasks::TaskName, Listener, TaskManager}, + write_vec, }; use xlineapi::command::KeyRange; @@ -62,6 +61,9 @@ struct Watcher { event_tx: mpsc::Sender, /// Compacted flag compacted: bool, + /// TODO: remove it when https://github.com/xline-kv/Xline/issues/491 has been closed + /// Store the revision that has been notified + notified_set: HashSet, } impl PartialEq for Watcher { @@ -97,6 +99,7 @@ impl Watcher { stop_notify, event_tx, compacted, + notified_set: HashSet::new(), } } @@ -114,10 +117,10 @@ impl Watcher { fn filter_events(&self, mut events: Vec) -> Vec { events.retain(|event| { self.filters.iter().all(|filter| filter != &event.r#type) - && (event - .kv - .as_ref() - .map_or(false, |kv| kv.mod_revision >= self.start_rev)) + && (event.kv.as_ref().map_or(false, |kv| { + kv.mod_revision >= self.start_rev + && !self.notified_set.contains(&kv.mod_revision) + })) }); events } @@ -136,26 +139,26 @@ impl Watcher { revision, compacted: self.compacted, }; - if !self.compacted { - if revision < self.start_rev || 0 == events_len { - return Ok(()); - } - debug!(watch_id, revision, events_len, "try to send watch response"); + if !self.compacted + && (revision < self.start_rev + || self.notified_set.contains(&revision) + || 0 == events_len) + { + return Ok(()); }; match self.event_tx.try_send(watch_event) { Ok(_) => { - debug!(watch_id, revision, "response sent successfully"); - self.start_rev = revision.overflow_add(1); + let _ignore = self.notified_set.insert(revision); Ok(()) } Err(TrySendError::Closed(_)) => { - debug!(watch_id, revision, "watcher is closed"); + warn!(watch_id, revision, "watcher is closed"); self.stop_notify.notify(1); Ok(()) } Err(TrySendError::Full(watch_event)) => { - debug!( + warn!( watch_id, revision, "events channel is full, will try to send later" ); @@ -540,7 +543,6 @@ where } /// Watch Event -#[derive(Debug)] pub(crate) struct WatchEvent { /// Watch ID id: WatchId, @@ -552,6 +554,18 @@ pub(crate) struct WatchEvent { compacted: bool, } +impl std::fmt::Debug for WatchEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "WatchEvent {{ id: {}, revision: {}, compacted: {}, ", + self.id, self.revision, self.compacted, + )?; + write_vec!(f, "events", self.events); + write!(f, " }}") + } +} + impl WatchEvent { /// Get revision pub(crate) fn revision(&self) -> i64 { @@ -590,7 +604,7 @@ mod test { use std::{collections::BTreeMap, time::Duration}; - use clippy_utilities::Cast; + use clippy_utilities::{Cast, OverflowArithmetic}; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; use utils::config::EngineConfig; diff --git a/crates/xline/src/storage/mod.rs b/crates/xline/src/storage/mod.rs index f4f00bc85..e04d2fe82 100644 --- a/crates/xline/src/storage/mod.rs +++ b/crates/xline/src/storage/mod.rs @@ -19,8 +19,7 @@ pub(crate) mod revision; /// Persistent storage abstraction pub(crate) mod storage_api; +pub use self::revision::Revision; pub(crate) use self::{ alarm_store::AlarmStore, auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, }; - -pub use self::revision::Revision; diff --git a/crates/xline/src/utils/args.rs b/crates/xline/src/utils/args.rs index e6effc50b..69b6b79c8 100644 --- a/crates/xline/src/utils/args.rs +++ b/crates/xline/src/utils/args.rs @@ -38,13 +38,13 @@ pub struct ServerArgs { #[clap(long)] name: String, /// Node peer listen urls - #[clap(long, num_args = 1.., value_delimiter = ',')] + #[clap(long, required = true, num_args = 1.., value_delimiter = ',')] peer_listen_urls: Vec, /// Node peer advertise urls #[clap(long, num_args = 1.., value_delimiter = ',')] peer_advertise_urls: Vec, /// Node client listen urls - #[clap(long, num_args = 1.., value_delimiter = ',')] + #[clap(long, required = true, num_args = 1.., value_delimiter = ',')] client_listen_urls: Vec, /// Node client advertise urls #[clap(long, num_args = 1.., value_delimiter = ',')] diff --git a/crates/xlineapi/Cargo.toml b/crates/xlineapi/Cargo.toml index 8bdf9dbad..608bd02cb 100644 --- a/crates/xlineapi/Cargo.toml +++ b/crates/xlineapi/Cargo.toml @@ -19,6 +19,7 @@ prost = "0.12.3" serde = { version = "1.0.137", features = ["derive"] } thiserror = "1.0.37" tonic = { version = "0.4.1", package = "madsim-tonic" } +utils = { path = "../utils", features = ["parking_lot"] } workspace-hack = { version = "0.1", path = "../../workspace-hack" } [build-dependencies] diff --git a/crates/xlineapi/src/lib.rs b/crates/xlineapi/src/lib.rs index 35b9e2893..b2364e7cc 100644 --- a/crates/xlineapi/src/lib.rs +++ b/crates/xlineapi/src/lib.rs @@ -200,6 +200,7 @@ mod errorpb { use std::fmt::Display; use command::KeyRange; +use utils::write_vec; pub use self::{ authpb::{permission::Type, Permission, Role, User, UserAddOptions}, @@ -721,6 +722,309 @@ impl Display for AlarmMember { write!(f, "memberID:{} alarm:{} ", self.member_id, alarm_str) } } + +/// Since the tokio-rs/prost will automatically derive Debug trait for all the structures it generates, we have to +/// override the Display trait for these requests and responses. +/// FYI: https://github.com/tokio-rs/prost/issues/334 +impl Display for PutRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "PutRequest {{ key: {:?}, value: {:?}, lease: {:?}, prev_kv: {:?}, ignore_value: {:?}, ignore_lease: {:?} }}", + String::from_utf8_lossy(&self.key), + String::from_utf8_lossy(&self.value), + self.lease, self.prev_kv, + self.ignore_value, + self.ignore_lease) + } +} + +impl Display for PutResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref prev_kv) = self.prev_kv { + write!( + f, + "PutResponse {{ header: {:?}, prev_kv: {} }}", + self.header, prev_kv + ) + } else { + write!( + f, + "PutResponse {{ header: {:?}, prev_kv: None }}", + self.header + ) + } + } +} + +impl Display for RangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RangeRequest {{ key: {:?}, range_end: {:?}, limit: {:?}, revision: {:?}, sort_order: {:?}, ", + String::from_utf8_lossy(&self.key), + String::from_utf8_lossy(&self.range_end), + self.limit, + self.revision, + self.sort_order, + )?; + write!( + f, + "sort_target: {:?}, serializable: {:?}, keys_only: {:?}, count_only: {:?}, min_mod_revision: {:?}, ", + self.sort_target, + self.serializable, + self.keys_only, + self.count_only, + self.min_mod_revision + )?; + write!( + f, + "max_mod_revision: {:?}, min_create_revision: {:?}, max_create_revision: {:?}, key_only: {:?}, count_only: {:?}, ", + self.max_mod_revision, + self.min_create_revision, + self.max_create_revision, + self.keys_only, + self.count_only, + )?; + write!( + f, + "min_mod_revision: {:?}, max_mod_revision: {:?}, min_create_revision: {:?}, max_create_revision: {:?} }}", + self.min_mod_revision, + self.max_mod_revision, + self.min_create_revision, + self.max_create_revision + ) + } +} + +impl Display for RangeResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "RangeResponse {{ header: {:?},", self.header)?; + write_vec!(f, "kvs", self.kvs); + write!(f, ", more: {:?}, count: {:?} }}", self.more, self.count) + } +} + +impl Display for DeleteRangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DeleteRangeRequest {{ key: {:?}, range_end: {:?}, prev_kv: {:?} }}", + String::from_utf8_lossy(&self.key), + String::from_utf8_lossy(&self.range_end), + self.prev_kv + ) + } +} + +impl Display for DeleteRangeResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DeleteRangeResponse {{ header: {:?}, deleted: {:?}, ", + self.header, self.deleted + )?; + write_vec!(f, "prev_kvs", self.prev_kvs); + write!(f, "}}") + } +} + +impl Display for RequestOp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "RequestOp {{")?; + if let Some(ref request) = self.request { + match request { + Request::RequestRange(req) => write!(f, "{}", req)?, + Request::RequestDeleteRange(req) => write!(f, "{}", req)?, + Request::RequestPut(req) => write!(f, "{}", req)?, + Request::RequestTxn(req) => write!(f, "{}", req)?, + } + } + write!(f, "}}") + } +} + +impl Display for ResponseOp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ResponseOp {{")?; + if let Some(ref request) = self.response { + match request { + Response::ResponseRange(req) => write!(f, "{}", req)?, + Response::ResponseDeleteRange(req) => write!(f, "{}", req)?, + Response::ResponsePut(req) => write!(f, "{}", req)?, + Response::ResponseTxn(req) => write!(f, "{}", req)?, + } + } + write!(f, "}}") + } +} + +impl Display for Permission { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let perm = match self.perm_type { + 0 => "Read", + 1 => "Write", + 2 => "Readwrite", + _ => "Unknown", + }; + write!( + f, + "Permission {{ permType: {:?}, key: {:?}, range_end: {:?} }}", + perm, + String::from_utf8_lossy(&self.key), + String::from_utf8_lossy(&self.range_end) + ) + } +} + +impl Display for Compare { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let result = match self.result { + 0 => "Equal", + 1 => "Greater", + 2 => "Less", + 3 => "NotEqual", + _ => "Unknown Result", + }; + let target = match self.result { + 0 => "Version", + 1 => "Create", + 2 => "Mod", + 3 => "Value", + 4 => "Lease", + _ => "Unknown Target", + }; + write!( + f, + "Compare {{ result: {:?}, target: {:?}, key: {}, range_end: {}, target_union: {:?} }}", + result, + target, + String::from_utf8_lossy(&self.key), + String::from_utf8_lossy(&self.range_end), + self.target_union + ) + } +} + +impl Display for AuthUserAddRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref option) = self.options { + write!( + f, + "AuthUserAddRequest {{ name: {}, no_password: {} }}", + self.name, option.no_password + ) + } else { + write!(f, "AuthUserAddRequest {{ name: {} }}", self.name) + } + } +} + +impl Display for AuthRoleGrantPermissionRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref perm) = self.perm { + write!( + f, + "AuthRoleGrantPermissionRequest {{ role: {}, perm: {} }}", + self.name, perm + ) + } else { + write!( + f, + "AuthRoleGrantPermissionRequest {{ role: {}, perm: None }}", + self.name + ) + } + } +} + +impl Display for AuthRoleRevokePermissionRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "AuthRoleRevokePermissionRequest {{ role: {}, key: {}, range_end: {} }}", + self.role, + String::from_utf8_lossy(&self.key), + String::from_utf8_lossy(&self.key), + ) + } +} + +impl Display for TxnRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TxnRequest {{ ")?; + write_vec!(f, "compare", self.compare); + write_vec!(f, ", success", self.success); + write_vec!(f, ", failure", self.failure); + write!(f, "}}") + } +} + +impl Display for TxnResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "TxnResponse {{ header: {:?}, succeeded: {} ", + self.header, self.succeeded + )?; + write_vec!(f, "responses", self.responses); + write!(f, "}}") + } +} + +impl Display for KeyValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "KeyValue {{ key: {:?}, create_revision: {}, mod_revision: {}, version: {}, value: {:?}, lease: {} }}", + String::from_utf8_lossy(&self.key), + self.create_revision, + self.mod_revision, + self.version, + String::from_utf8_lossy(&self.value), + self.lease, + ) + } +} + +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Event {{ type: {:?}, kv: ", self.r#type())?; + if let Some(kv) = &self.kv { + write!(f, "{}", kv)?; + } + write!(f, ", prev_kv: ")?; + if let Some(prev_kv) = &self.prev_kv { + write!(f, "{}", prev_kv)?; + } + write!(f, " }}") + } +} + +impl Display for AuthRoleGetResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AuthRoleGetResponse {{ header: {:?}", self.header)?; + write_vec!(f, "perm", self.perm); + write!(f, " }}") + } +} + +impl Display for AuthRoleListResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AuthRoleListResponse {{ header: {:?}", self.header)?; + write_vec!(f, "roles", self.roles); + write!(f, " }}") + } +} + +impl Display for AlarmResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AlarmResponse {{ header: {:?}, ", self.header)?; + write_vec!(f, "alarms", self.alarms); + write!(f, " }}") + } +} + #[cfg(test)] mod test { use super::*; diff --git a/doc/quick-start/README.md b/doc/quick-start/README.md index 150cdd77b..6ddf13de9 100644 --- a/doc/quick-start/README.md +++ b/doc/quick-start/README.md @@ -88,7 +88,7 @@ docker build . -t ghcr.io/xline-kv/xline -f doc/quick-start/Dockerfile ### Start Xline servers ```bash -cp ./xline-test-utils/{private,public}.pem ./scripts +cp ./fixtures/{private,public}.pem ./scripts ./scripts/quick_start.sh ``` diff --git a/scripts/validation_test.sh b/scripts/validation_test.sh index 0ef27a43a..8ce94e664 100755 --- a/scripts/validation_test.sh +++ b/scripts/validation_test.sh @@ -2,10 +2,10 @@ DIR="$(dirname $0)" QUICK_START="${DIR}/quick_start.sh" ETCDCTL="docker exec -i client etcdctl --endpoints=http://172.20.0.3:2379,http://172.20.0.4:2379" -LOCK_CLIENT="docker exec -i client /mnt/validation_lock_client --endpoints=http://172.20.0.3:2379" +LOCK_CLIENT="docker exec -i client /mnt/validation_lock_client --endpoints=http://172.20.0.3:2379,http://172.20.0.4:2379,http://172.20.0.5:2379" -LOG_PATH=/mnt/logs bash ${QUICK_START} +LOG_PATH=${DIR}/logs LOG_LEVEL=debug bash ${QUICK_START} source $DIR/log.sh stop() { @@ -18,16 +18,6 @@ trap stop TERM res="" -function parse_result() { - local tmp_res="" - while read -r line; do - log::debug $line - tmp_res="${tmp_res}${line}\n" - done - res="${tmp_res}" -} - - function check() { local pattern=$1 if [[ $(echo -e $res) =~ $pattern ]]; then @@ -37,10 +27,20 @@ function check() { fi } +function parse_result() { + local tmp_res="" + while read -r line; do + log::info $line + tmp_res="${tmp_res}${line}\n" + done + res="${tmp_res}" +} + function run() { command=$@ log::info "running: $command" - parse_result <<< "$(eval $command 2>&1)" + local run_res="$(eval $command 2>&1)" + parse_result <<< $run_res } # validate compact requests