From 3d0a41c763226783448552188d54df56eaf08a0a Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 20 Dec 2023 12:35:24 +0800 Subject: [PATCH] Fixes the 'probing stopped service' error in vineyardd, and move implementations to .cc (#1686) Fixes #1685 Signed-off-by: Tao He --- src/server/services/meta_service.cc | 735 +++++++++++++++++++++++++++- src/server/services/meta_service.h | 708 +-------------------------- 2 files changed, 753 insertions(+), 690 deletions(-) diff --git a/src/server/services/meta_service.cc b/src/server/services/meta_service.cc index cd1f2ac5e0..2fb34edd6c 100644 --- a/src/server/services/meta_service.cc +++ b/src/server/services/meta_service.cc @@ -18,8 +18,13 @@ limitations under the License. #include #include -#include "common/util/logging.h" // IWYU pragma: keep +#include "boost/algorithm/string/predicate.hpp" +#include "boost/algorithm/string/split.hpp" +#include "boost/algorithm/string/trim.hpp" +#include "common/util/env.h" +#include "common/util/functions.h" +#include "common/util/logging.h" // IWYU pragma: keep #include "server/services/local_meta_service.h" #if defined(BUILD_VINEYARDD_ETCD) #include "server/services/etcd_meta_service.h" @@ -28,6 +33,7 @@ limitations under the License. #include "server/services/redis_meta_service.h" #endif // BUILD_VINEYARDD_REDIS #include "server/util/meta_tree.h" +#include "server/util/metrics.h" namespace vineyard { @@ -62,7 +68,298 @@ std::shared_ptr IMetaService::Get( IMetaService::~IMetaService() { this->Stop(); } -void IMetaService::Stop() { LOG(INFO) << "meta service is stopping ..."; } +void IMetaService::Stop() { + if (this->stopped_.exchange(true)) { + return; + } + LOG(INFO) << "meta service is stopping ..."; +} + +Status IMetaService::Start() { + LOG(INFO) << "meta service is starting, waiting the metadata backend " + "service becoming ready ..."; + RETURN_ON_ERROR(this->preStart()); + auto current = std::chrono::system_clock::now(); + auto timeout = + std::chrono::seconds(server_ptr_->GetSpec()["metastore_spec"].value( + "meta_timeout", 60 /* 1 minutes */)); + Status s; + while (std::chrono::system_clock::now() - current < timeout) { + if (this->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + s = this->probe(); + if (s.ok()) { + break; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + RETURN_ON_ERROR(s); + auto self(shared_from_this()); + requestValues( + "", [self](const Status& status, const json& meta, unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + // start the watcher. + self->startDaemonWatch("", self->rev_, + boost::bind(&IMetaService::daemonWatchHandler, + self, _1, _2, _3, _4)); + + // register self info. + self->registerToEtcd(); + } else { + Status s = status; + s << "Failed to get initial value"; + // Abort: since the probe has succeeded but the etcd + // doesn't work, we have no idea about what happened. + s.Abort(); + } + return status; + }); + return Status::OK(); +} + +void IMetaService::RequestToBulkUpdate( + callback_t&, ObjectID&, Signature&, + InstanceID&> + callback_after_ready, + callback_t + callback_after_finish) { + auto self(shared_from_this()); + server_ptr_->GetMetaContext().post( + [self, callback_after_ready, callback_after_finish]() { + if (self->stopped_.load()) { + VINEYARD_SUPPRESS(callback_after_finish( + Status::AlreadyStopped("etcd metadata service"), + UnspecifiedInstanceID(), InvalidObjectID(), InvalidSignature())); + return; + } + std::vector ops; + ObjectID object_id; + Signature signature; + InstanceID computed_instance_id; + auto status = + callback_after_ready(Status::OK(), self->meta_, ops, object_id, + signature, computed_instance_id); + if (status.ok()) { +#ifndef NDEBUG + // debugging + self->printDepsGraph(); +#endif + self->metaUpdate(ops, false); + } else { + VLOG(100) << "Error: failed to generated ops to update metadata: " + << status.ToString(); + } + VINEYARD_SUPPRESS(callback_after_finish(status, object_id, signature, + computed_instance_id)); + }); +} + +// When requesting direct update, we already worked inside the meta context. +void IMetaService::RequestToDirectUpdate(std::vector const& ops, + const bool from_remote) { + this->metaUpdate(ops, from_remote); +} + +void IMetaService::RequestToPersist( + callback_t&> callback_after_ready, + callback_t<> callback_after_finish) { + // NB: when persist local meta to etcd, we needs the meta_sync_lock_ to + // avoid contention between other vineyard instances. + auto self(shared_from_this()); + this->requestLock( + meta_sync_lock_, [self, callback_after_ready, callback_after_finish]( + const Status& status, std::shared_ptr lock) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + self->requestValues( + "", [self, callback_after_ready, callback_after_finish, lock]( + const Status& status, const json& meta, unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + std::vector ops; + auto s = callback_after_ready(status, meta, ops); + if (s.ok()) { + if (ops.empty()) { + unsigned rev_after_unlock = 0; + VINEYARD_DISCARD(lock->Release(rev_after_unlock)); + return callback_after_finish(Status::OK()); + } + // apply changes locally before committing to etcd + self->metaUpdate(ops, false); + // commit to etcd + self->commitUpdates(ops, [self, callback_after_finish, lock]( + const Status& status, + unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + // update rev_ to the revision after unlock. + unsigned rev_after_unlock = 0; + VINEYARD_DISCARD(lock->Release(rev_after_unlock)); + return callback_after_finish(status); + }); + return Status::OK(); + } else { + unsigned rev_after_unlock = 0; + VINEYARD_DISCARD(lock->Release(rev_after_unlock)); + return callback_after_finish(s); // propagate the error + } + }); + return Status::OK(); + } else { + VLOG(100) << "Error: failed to request metadata lock: " + << status.ToString(); + return callback_after_finish(status); // propagate the error + } + }); +} + +void IMetaService::RequestToGetData(const bool sync_remote, + callback_t callback) { + if (sync_remote) { + requestValues( + "", [callback](const Status& status, const json& meta, unsigned rev) { + return callback(status, meta); + }); + } else { + // post the task to asio queue as well for well-defined processing order. + // + // Note that we need to pass `meta_` as reference, see also: + // + // https://www.boost.org/doc/libs/1_73_0/libs/bind/doc/html/bind.html + server_ptr_->GetMetaContext().post( + boost::bind(callback, Status::OK(), std::ref(meta_))); + } +} + +void IMetaService::RequestToDelete( + const std::vector& object_ids, const bool force, const bool deep, + callback_t const&, std::vector&, + bool&> + callback_after_ready, + callback_t const&> callback_after_finish) { + auto self(shared_from_this()); + server_ptr_->GetMetaContext().post([self, object_ids, force, deep, + callback_after_ready, + callback_after_finish]() { + if (self->stopped_.load()) { + VINEYARD_DISCARD(callback_after_finish( + Status::AlreadyStopped("etcd metadata service"), {})); + } + + // generated ops. + std::vector ops; + + bool sync_remote = false; + std::vector processed_delete_set; + self->findDeleteSet(object_ids, processed_delete_set, force, deep); + +#ifndef NDEBUG + if (VLOG_IS_ON(10)) { + for (auto const& item : processed_delete_set) { + VLOG(10) << "deleting object: " << ObjectIDToString(item); + } + } +#endif + + auto s = callback_after_ready(Status::OK(), self->meta_, + processed_delete_set, ops, sync_remote); + if (!s.ok()) { + VINEYARD_DISCARD(callback_after_finish(s, {})); + return; + } + + // apply changes locally (before committing to etcd) + self->metaUpdate(ops, false); + + if (!sync_remote) { + VINEYARD_DISCARD(callback_after_finish(s, processed_delete_set)); + return; + } + + // apply remote updates + // + // NB: when persist local meta to etcd, we needs the meta_sync_lock_ to + // avoid contention between other vineyard instances. + self->requestLock( + self->meta_sync_lock_, + [self, ops /* by copy */, callback_after_ready, processed_delete_set, + callback_after_finish](const Status& status, + std::shared_ptr lock) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + // commit to etcd + self->commitUpdates( + ops, [self, processed_delete_set, callback_after_finish, lock]( + const Status& status, unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + // update rev_ to the revision after unlock. + unsigned rev_after_unlock = 0; + VINEYARD_DISCARD(lock->Release(rev_after_unlock)); + return callback_after_finish(status, processed_delete_set); + }); + return Status::OK(); + } else { + VLOG(100) << "Error: failed to request metadata lock: " + << status.ToString(); + return callback_after_finish(status, {}); // propagate the error. + } + }); + }); +} + +void IMetaService::RequestToShallowCopy( + callback_t&, bool&> callback_after_ready, + callback_t<> callback_after_finish) { + auto self(shared_from_this()); + requestValues("", [self, callback_after_ready, callback_after_finish]( + const Status& status, const json& meta, unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + std::vector ops; + bool transient = true; + auto status = callback_after_ready(Status::OK(), meta, ops, transient); + if (status.ok()) { + if (transient) { + self->metaUpdate(ops, false); + return callback_after_finish(Status::OK()); + } else { + self->RequestToPersist( + [self, ops](const Status& status, const json& meta, + std::vector& persist_ops) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + persist_ops.insert(persist_ops.end(), ops.begin(), ops.end()); + return Status::OK(); + }, + callback_after_finish); + return Status::OK(); + } + } else { + VLOG(100) << "Error: failed to generated ops to update metadata: " + << status.ToString(); + return callback_after_finish(status); + } + } else { + VLOG(100) << "Error: request values failed: " << status.ToString(); + return callback_after_finish(status); + } + }); +} /** Note [Deleting objects and blobs] * @@ -178,6 +475,229 @@ void IMetaService::CloneRef(ObjectID const target, ObjectID const mirror) { } } +void IMetaService::registerToEtcd() { + auto self(shared_from_this()); + RequestToPersist( + [self](const Status& status, const json& tree, std::vector& ops) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + std::string hostname = get_hostname(), nodename = get_nodename(); + + int64_t timestamp = GetTimestamp(); + + self->instances_list_.clear(); + uint64_t self_host_id = + static_cast(gethostid()) | detail::cycleclock::now(); + if (tree.contains("instances") && !tree["instances"].is_null()) { + for (auto& instance : tree["instances"].items()) { + auto id = + static_cast(std::stoul(instance.key().substr(1))); + self->instances_list_.emplace(id); + } + } + InstanceID rank = 0; + if (tree.contains("next_instance_id") && + !tree["next_instance_id"].is_null()) { + rank = tree["next_instance_id"].get(); + } + + self->server_ptr_->set_instance_id(rank); + self->server_ptr_->set_hostname(hostname); + self->server_ptr_->set_nodename(nodename); + + // store an entry in the meta tree + self->meta_["my_instance_id"] = rank; + self->meta_["my_hostname"] = hostname; + self->meta_["my_nodename"] = nodename; + + self->instances_list_.emplace(rank); + std::string key = "/instances/" + self->server_ptr_->instance_name(); + ops.emplace_back(op_t::Put(key + "/hostid", self_host_id)); + ops.emplace_back(op_t::Put(key + "/hostname", hostname)); + ops.emplace_back(op_t::Put(key + "/nodename", nodename)); + ops.emplace_back(op_t::Put(key + "/rpc_endpoint", + self->server_ptr_->RPCEndpoint())); + ops.emplace_back( + op_t::Put(key + "/ipc_socket", self->server_ptr_->IPCSocket())); + ops.emplace_back(op_t::Put(key + "/timestamp", timestamp)); + ops.emplace_back(op_t::Put("/next_instance_id", rank + 1)); + LOG(INFO) << "Decide to set rank as " << rank; + return status; + } else { + LOG(ERROR) << status.ToString(); + return status; + } + }, + [self](const Status& status) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + // start heartbeat + VINEYARD_DISCARD(startHeartbeat(self, Status::OK())); + // mark meta service as ready + self->Ready(); + } else { + self->server_ptr_->set_instance_id(UINT64_MAX); + LOG(ERROR) << "compute instance_id error."; + } + return status; + }); +} + +void IMetaService::checkInstanceStatus( + std::shared_ptr const& self, + callback_t<> callback_after_finish) { + self->RequestToPersist( + [self](const Status& status, const json& tree, std::vector& ops) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + ops.emplace_back(op_t::Put( + "/instances/" + self->server_ptr_->instance_name() + "/timestamp", + GetTimestamp())); + return status; + } else { + LOG(ERROR) << status.ToString(); + return status; + } + }, + [self, callback_after_finish](const Status& status) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (!status.ok()) { + LOG(ERROR) << "Failed to refresh self: " << status.ToString(); + return callback_after_finish(status); + } + auto the_next = + self->instances_list_.upper_bound(self->server_ptr_->instance_id()); + if (the_next == self->instances_list_.end()) { + the_next = self->instances_list_.begin(); + } + InstanceID target_inst = *the_next; + if (target_inst == self->server_ptr_->instance_id()) { + return callback_after_finish(status); + } + VLOG(10) << "Instance size " << self->instances_list_.size() + << ", target instance is " << target_inst; + auto target = + self->meta_["instances"]["i" + std::to_string(target_inst)]; + // The subtree might be empty, when the etcd been resumed with another + // data directory but the same endpoint. that leads to a crash here + // but we just let it crash to help us diagnosis the error. + if (!target.is_null() /* && !target.empty() */) { + int64_t ts = target["timestamp"].get(); + if (ts == self->target_latest_time_) { + ++self->timeout_count_; + } else { + self->timeout_count_ = 0; + } + self->target_latest_time_ = ts; + if (self->timeout_count_ >= MAX_TIMEOUT_COUNT) { + LOG(ERROR) << "Instance " << target_inst << " timeout"; + self->timeout_count_ = 0; + self->target_latest_time_ = 0; + self->RequestToPersist( + [self, target_inst](const Status& status, const json& tree, + std::vector& ops) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + std::string key = + "/instances/i" + std::to_string(target_inst); + ops.emplace_back(op_t::Del(key + "/hostid")); + ops.emplace_back(op_t::Del(key + "/timestamp")); + ops.emplace_back(op_t::Del(key + "/hostname")); + ops.emplace_back(op_t::Del(key + "/nodename")); + ops.emplace_back(op_t::Del(key + "/rpc_endpoint")); + ops.emplace_back(op_t::Del(key + "/ipc_socket")); + } else { + LOG(ERROR) << status.ToString(); + } + return status; + }, + [self, callback_after_finish](const Status& status) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + return callback_after_finish(status); + }); + VINEYARD_SUPPRESS( + self->server_ptr_->DeleteAllAt(self->meta_, target_inst)); + return status; + } else { + return callback_after_finish(status); + } + } else { + return callback_after_finish(status); + } + }); +} + +Status IMetaService::startHeartbeat(std::shared_ptr const& self, + Status const&) { + self->heartbeat_timer_.reset( + new asio::steady_timer(self->server_ptr_->GetMetaContext(), + std::chrono::seconds(HEARTBEAT_TIME))); + self->heartbeat_timer_->async_wait( + [self](const boost::system::error_code& error) { + if (self->stopped_.load()) { + return; + } + if (error) { + LOG(ERROR) << "heartbeat timer error: " << error << ", " + << error.message(); + } + if (!error || error != boost::system::errc::operation_canceled) { + // run check, and start the next round in the finish callback. + checkInstanceStatus( + self, boost::bind(&IMetaService::startHeartbeat, self, _1)); + } + }); + return Status::OK(); +} + +void IMetaService::requestValues(const std::string& prefix, + callback_t callback) { + // We still need to run a `etcdctl get` for the first time. With a + // long-running and no compact Etcd, watching from revision 0 may + // lead to a super huge amount of events, which is unacceptable. + auto self(shared_from_this()); + if (rev_ == 0) { + requestAll(prefix, rev_, + [self, callback](const Status& status, + const std::vector& ops, unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + self->metaUpdate(ops, true); + self->rev_ = rev; + } + return callback(status, self->meta_, self->rev_); + }); + } else { + requestUpdates( + prefix, rev_, + [self, callback](const Status& status, const std::vector& ops, + unsigned rev) { + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + if (status.ok()) { + self->metaUpdate(ops, true); + self->rev_ = rev; + } + return callback(status, self->meta_, self->rev_); + }); + } +} + bool IMetaService::deleteable(ObjectID const object_id) { if (object_id == InvalidObjectID()) { return true; @@ -455,4 +975,215 @@ void IMetaService::delVal(ObjectID const& target, std::set& blobs) { } } +template +void IMetaService::metaUpdate(const RangeT& ops, bool const from_remote) { + std::set blobs_to_delete; + + std::vector add_sigs, drop_sigs; + std::vector add_objects, drop_objects; + std::vector add_others, drop_others; + + // group-by all changes + for (const op_t& op : ops) { + if (op.kv.rev != 0 && op.kv.rev <= rev_) { +#ifndef NDEBUG + if (from_remote && op.kv.rev <= rev_) { + LOG(WARNING) << "skip updates: " << op.ToString(); + } +#endif + // revision resolution: means this revision has already been updated + // the revision value 0 means local update ops. + continue; + } + if (boost::algorithm::trim_copy(op.kv.key).empty()) { + // skip unprintable keys + continue; + } + if (boost::algorithm::starts_with(op.kv.key, meta_sync_lock_)) { + // skip the update of etcd lock + continue; + } + + // update instance status + if (boost::algorithm::starts_with(op.kv.key, "/instances/")) { + instanceUpdate(op, from_remote); + } + +#ifndef NDEBUG + if (from_remote) { + VLOG(11) << "update op in meta tree: " << op.ToString(); + } +#endif + + if (boost::algorithm::starts_with(op.kv.key, "/signatures/")) { + if (op.op == op_t::op_type_t::kPut) { + add_sigs.emplace_back(op); + } else if (op.op == op_t::op_type_t::kDel) { + drop_sigs.emplace_back(op); + } else { + LOG(ERROR) << "warn: unknown op type for signatures: " << op.op; + } + } else if (boost::algorithm::starts_with(op.kv.key, "/data/")) { + if (op.op == op_t::op_type_t::kPut) { + add_objects.emplace_back(op); + } else if (op.op == op_t::op_type_t::kDel) { + drop_objects.emplace_back(op); + } else { + LOG(ERROR) << "warn: unknown op type for objects: " << op.op; + } + } else { + if (op.op == op_t::op_type_t::kPut) { + add_others.emplace_back(op); + } else if (op.op == op_t::op_type_t::kDel) { + drop_others.emplace_back(op); + } else { + LOG(ERROR) << "warn: unknown op type for others: " << op.op; + } + } + } + + // apply adding signature mappings first. + for (const op_t& op : add_sigs) { + putVal(op.kv, from_remote); + } + + // apply adding others + for (const op_t& op : add_others) { + putVal(op.kv, from_remote); + } + + // apply adding objects + for (const op_t& op : add_objects) { + putVal(op.kv, from_remote); + } + + // apply drop objects + { + // 1. collect all ids + std::set initial_delete_set; + std::vector vs; + for (const op_t& op : drop_objects) { + vs.clear(); + boost::algorithm::split(vs, op.kv.key, + [](const char c) { return c == '/'; }); + if (vs[0].empty()) { + vs.erase(vs.begin()); + } + // `__name` is our injected properties, and will be erased during + // `DropName`. + if (vs.size() >= 3 && vs[2] == "__name") { + // move the key to `drop_others` to drop + drop_others.emplace_back(op); + } else { + initial_delete_set.emplace(ObjectIDFromString(vs[1])); + } + } + std::vector object_ids{initial_delete_set.begin(), + initial_delete_set.end()}; + + // 2. traverse to find the delete set + std::vector processed_delete_set; + findDeleteSet(object_ids, processed_delete_set, false, false); + +#ifndef NDEBUG + if (VLOG_IS_ON(10)) { + for (auto const& item : processed_delete_set) { + VLOG(10) << "deleting object (in meta update): " + << ObjectIDToString(item); + } + } +#endif + + // 3. execute delete for every object + for (auto const target : processed_delete_set) { + delVal(target, blobs_to_delete); + } + } + + // apply drop others + for (const op_t& op : drop_others) { + delVal(op.kv); + } + + // apply drop signatures + for (const op_t& op : drop_sigs) { + delVal(op.kv); + } + +#ifndef NDEBUG + // debugging + printDepsGraph(); + for (auto const& id : blobs_to_delete) { + LOG(INFO) << "blob to delete: " << ObjectIDToString(id); + } +#endif + + VINEYARD_SUPPRESS(server_ptr_->DeleteBlobBatch(blobs_to_delete)); + VINEYARD_SUPPRESS(server_ptr_->ProcessDeferred(meta_)); +} + +void IMetaService::instanceUpdate(const op_t& op, const bool from_remote) { + std::vector key_segments; + boost::split(key_segments, op.kv.key, boost::is_any_of("/")); + if (key_segments[0].empty()) { + key_segments.erase(key_segments.begin()); + } + if (key_segments[2] == "hostid") { + uint64_t instance_id = std::stoul(key_segments[1].substr(1)); + if (op.op == op_t::op_type_t::kPut) { + if (from_remote) { + LOG(INFO) << "Instance join: " << instance_id; + } + instances_list_.emplace(instance_id); + } else if (op.op == op_t::op_type_t::kDel) { + if (from_remote) { + LOG(INFO) << "Instance exit: " << instance_id; + } + instances_list_.erase(instance_id); + } else { + if (from_remote) { + LOG(ERROR) << "Unknown op type: " << op.ToString(); + } + } + LOG_SUMMARY("instances_total", "", instances_list_.size()); + } +} + +Status IMetaService::daemonWatchHandler( + std::shared_ptr self, const Status& status, + const std::vector& ops, unsigned rev, + callback_t callback_after_update) { + // `this` must be non-stopped in this handler, as the {Etcd}WatchHandler + // keeps a reference of `this` (std::shared_ptr). + if (self->stopped_.load()) { + return Status::AlreadyStopped("etcd metadata service"); + } + // Guarantee: all kvs inside a txn reaches the client at the same time, + // which is guaranteed by the implementation of etcd. + // + // That means, every time this handler is called, we just need to response + // for one type of change. + if (!status.ok()) { + LOG(ERROR) << "Error in daemon watching: " << status.ToString(); + return callback_after_update(status, rev); + } + if (ops.empty()) { + return callback_after_update(Status::OK(), rev); + } + // process events grouped by revision + size_t idx = 0; + std::vector op_batch; + while (idx < ops.size()) { + unsigned head_index = ops[idx].kv.rev; + while (idx < ops.size() && ops[idx].kv.rev == head_index) { + op_batch.emplace_back(ops[idx]); + idx += 1; + } + self->metaUpdate(op_batch, true); + op_batch.clear(); + self->rev_ = head_index; + } + return callback_after_update(Status::OK(), rev); +} + } // namespace vineyard diff --git a/src/server/services/meta_service.h b/src/server/services/meta_service.h index d15d9fc25e..310ac735aa 100644 --- a/src/server/services/meta_service.h +++ b/src/server/services/meta_service.h @@ -92,294 +92,40 @@ class IMetaService : public std::enable_shared_from_this { static std::shared_ptr Get( std::shared_ptr vs_ptr); - inline Status Start() { - LOG(INFO) << "meta service is starting, waiting the metadata backend " - "service becoming ready ..."; - RETURN_ON_ERROR(this->preStart()); - auto current = std::chrono::system_clock::now(); - auto timeout = - std::chrono::seconds(server_ptr_->GetSpec()["metastore_spec"].value( - "meta_timeout", 60 /* 1 minutes */)); - Status s; - while (std::chrono::system_clock::now() - current < timeout) { - s = this->probe(); - if (s.ok()) { - break; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - RETURN_ON_ERROR(s); - auto self(shared_from_this()); - requestValues("", [self](const Status& status, const json& meta, - unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - // start the watcher. - self->startDaemonWatch("", self->rev_, - boost::bind(&IMetaService::daemonWatchHandler, - self, _1, _2, _3, _4)); - - // register self info. - self->registerToEtcd(); - } else { - Status s = status; - s << "Failed to get initial value"; - // Abort: since the probe has succeeded but the etcd - // doesn't work, we have no idea about what happened. - s.Abort(); - } - return status; - }); - return Status::OK(); - } + Status Start(); virtual void Stop(); public: - inline void RequestToBulkUpdate( + void RequestToBulkUpdate( callback_t&, ObjectID&, Signature&, InstanceID&> callback_after_ready, callback_t - callback_after_finish) { - auto self(shared_from_this()); - server_ptr_->GetMetaContext().post([self, callback_after_ready, - callback_after_finish]() { - if (self->stopped_.load()) { - VINEYARD_SUPPRESS(callback_after_finish( - Status::AlreadyStopped("etcd metadata service"), - UnspecifiedInstanceID(), InvalidObjectID(), InvalidSignature())); - return; - } - std::vector ops; - ObjectID object_id; - Signature signature; - InstanceID computed_instance_id; - auto status = - callback_after_ready(Status::OK(), self->meta_, ops, object_id, - signature, computed_instance_id); - if (status.ok()) { -#ifndef NDEBUG - // debugging - self->printDepsGraph(); -#endif - self->metaUpdate(ops, false); - } else { - VLOG(100) << "Error: failed to generated ops to update metadata: " - << status.ToString(); - } - VINEYARD_SUPPRESS(callback_after_finish(status, object_id, signature, - computed_instance_id)); - }); - } + callback_after_finish); // When requesting direct update, we already worked inside the meta context. - inline void RequestToDirectUpdate(std::vector const& ops, - const bool from_remote = false) { - this->metaUpdate(ops, from_remote); - } + void RequestToDirectUpdate(std::vector const& ops, + const bool from_remote = false); - inline void RequestToPersist( + void RequestToPersist( callback_t&> callback_after_ready, - callback_t<> callback_after_finish) { - // NB: when persist local meta to etcd, we needs the meta_sync_lock_ to - // avoid contention between other vineyard instances. - auto self(shared_from_this()); - this->requestLock( - meta_sync_lock_, - [self, callback_after_ready, callback_after_finish]( - const Status& status, std::shared_ptr lock) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - self->requestValues( - "", [self, callback_after_ready, callback_after_finish, lock]( - const Status& status, const json& meta, unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - std::vector ops; - auto s = callback_after_ready(status, meta, ops); - if (s.ok()) { - if (ops.empty()) { - unsigned rev_after_unlock = 0; - VINEYARD_DISCARD(lock->Release(rev_after_unlock)); - return callback_after_finish(Status::OK()); - } - // apply changes locally before committing to etcd - self->metaUpdate(ops, false); - // commit to etcd - self->commitUpdates(ops, [self, callback_after_finish, - lock](const Status& status, - unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - // update rev_ to the revision after unlock. - unsigned rev_after_unlock = 0; - VINEYARD_DISCARD(lock->Release(rev_after_unlock)); - return callback_after_finish(status); - }); - return Status::OK(); - } else { - unsigned rev_after_unlock = 0; - VINEYARD_DISCARD(lock->Release(rev_after_unlock)); - return callback_after_finish(s); // propagate the error - } - }); - return Status::OK(); - } else { - VLOG(100) << "Error: failed to request metadata lock: " - << status.ToString(); - return callback_after_finish(status); // propagate the error - } - }); - } + callback_t<> callback_after_finish); - inline void RequestToGetData(const bool sync_remote, - callback_t callback) { - if (sync_remote) { - requestValues( - "", [callback](const Status& status, const json& meta, unsigned rev) { - return callback(status, meta); - }); - } else { - // post the task to asio queue as well for well-defined processing order. - // - // Note that we need to pass `meta_` as reference, see also: - // - // https://www.boost.org/doc/libs/1_73_0/libs/bind/doc/html/bind.html - server_ptr_->GetMetaContext().post( - boost::bind(callback, Status::OK(), std::ref(meta_))); - } - } + void RequestToGetData(const bool sync_remote, + callback_t callback); - inline void RequestToDelete( + void RequestToDelete( const std::vector& object_ids, const bool force, const bool deep, callback_t const&, std::vector&, bool&> callback_after_ready, - callback_t const&> callback_after_finish) { - auto self(shared_from_this()); - server_ptr_->GetMetaContext().post([self, object_ids, force, deep, - callback_after_ready, - callback_after_finish]() { - if (self->stopped_.load()) { - VINEYARD_DISCARD(callback_after_finish( - Status::AlreadyStopped("etcd metadata service"), {})); - } - - // generated ops. - std::vector ops; - - bool sync_remote = false; - std::vector processed_delete_set; - self->findDeleteSet(object_ids, processed_delete_set, force, deep); - -#ifndef NDEBUG - if (VLOG_IS_ON(10)) { - for (auto const& item : processed_delete_set) { - VLOG(10) << "deleting object: " << ObjectIDToString(item); - } - } -#endif - - auto s = callback_after_ready(Status::OK(), self->meta_, - processed_delete_set, ops, sync_remote); - if (!s.ok()) { - VINEYARD_DISCARD(callback_after_finish(s, {})); - return; - } - - // apply changes locally (before committing to etcd) - self->metaUpdate(ops, false); - - if (!sync_remote) { - VINEYARD_DISCARD(callback_after_finish(s, processed_delete_set)); - return; - } - - // apply remote updates - // - // NB: when persist local meta to etcd, we needs the meta_sync_lock_ to - // avoid contention between other vineyard instances. - self->requestLock( - self->meta_sync_lock_, - [self, ops /* by copy */, callback_after_ready, processed_delete_set, - callback_after_finish](const Status& status, - std::shared_ptr lock) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - // commit to etcd - self->commitUpdates( - ops, [self, processed_delete_set, callback_after_finish, - lock](const Status& status, unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - // update rev_ to the revision after unlock. - unsigned rev_after_unlock = 0; - VINEYARD_DISCARD(lock->Release(rev_after_unlock)); - return callback_after_finish(status, processed_delete_set); - }); - return Status::OK(); - } else { - VLOG(100) << "Error: failed to request metadata lock: " - << status.ToString(); - return callback_after_finish(status, {}); // propagate the error. - } - }); - }); - } + callback_t const&> callback_after_finish); - inline void RequestToShallowCopy( + void RequestToShallowCopy( callback_t&, bool&> callback_after_ready, - callback_t<> callback_after_finish) { - auto self(shared_from_this()); - requestValues("", [self, callback_after_ready, callback_after_finish]( - const Status& status, const json& meta, - unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - std::vector ops; - bool transient = true; - auto status = callback_after_ready(Status::OK(), meta, ops, transient); - if (status.ok()) { - if (transient) { - self->metaUpdate(ops, false); - return callback_after_finish(Status::OK()); - } else { - self->RequestToPersist( - [self, ops](const Status& status, const json& meta, - std::vector& persist_ops) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - persist_ops.insert(persist_ops.end(), ops.begin(), ops.end()); - return Status::OK(); - }, - callback_after_finish); - return Status::OK(); - } - } else { - VLOG(100) << "Error: failed to generated ops to update metadata: " - << status.ToString(); - return callback_after_finish(status); - } - } else { - VLOG(100) << "Error: request values failed: " << status.ToString(); - return callback_after_finish(status); - } - }); - } + callback_t<> callback_after_finish); void IncRef(std::string const& instance_name, std::string const& key, std::string const& value, const bool from_remote); @@ -389,78 +135,7 @@ class IMetaService : public std::enable_shared_from_this { bool stopped() const { return this->stopped_.load(); } private: - inline void registerToEtcd() { - auto self(shared_from_this()); - RequestToPersist( - [self](const Status& status, const json& tree, std::vector& ops) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - std::string hostname = get_hostname(), nodename = get_nodename(); - - int64_t timestamp = GetTimestamp(); - - self->instances_list_.clear(); - uint64_t self_host_id = - static_cast(gethostid()) | detail::cycleclock::now(); - if (tree.contains("instances") && !tree["instances"].is_null()) { - for (auto& instance : tree["instances"].items()) { - auto id = static_cast( - std::stoul(instance.key().substr(1))); - self->instances_list_.emplace(id); - } - } - InstanceID rank = 0; - if (tree.contains("next_instance_id") && - !tree["next_instance_id"].is_null()) { - rank = tree["next_instance_id"].get(); - } - - self->server_ptr_->set_instance_id(rank); - self->server_ptr_->set_hostname(hostname); - self->server_ptr_->set_nodename(nodename); - - // store an entry in the meta tree - self->meta_["my_instance_id"] = rank; - self->meta_["my_hostname"] = hostname; - self->meta_["my_nodename"] = nodename; - - self->instances_list_.emplace(rank); - std::string key = - "/instances/" + self->server_ptr_->instance_name(); - ops.emplace_back(op_t::Put(key + "/hostid", self_host_id)); - ops.emplace_back(op_t::Put(key + "/hostname", hostname)); - ops.emplace_back(op_t::Put(key + "/nodename", nodename)); - ops.emplace_back(op_t::Put(key + "/rpc_endpoint", - self->server_ptr_->RPCEndpoint())); - ops.emplace_back( - op_t::Put(key + "/ipc_socket", self->server_ptr_->IPCSocket())); - ops.emplace_back(op_t::Put(key + "/timestamp", timestamp)); - ops.emplace_back(op_t::Put("/next_instance_id", rank + 1)); - LOG(INFO) << "Decide to set rank as " << rank; - return status; - } else { - LOG(ERROR) << status.ToString(); - return status; - } - }, - [self](const Status& status) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - // start heartbeat - VINEYARD_DISCARD(startHeartbeat(self, Status::OK())); - // mark meta service as ready - self->Ready(); - } else { - self->server_ptr_->set_instance_id(UINT64_MAX); - LOG(ERROR) << "compute instance_id error."; - } - return status; - }); - } + void registerToEtcd(); /** * Watch rules: @@ -470,119 +145,10 @@ class IMetaService : public std::enable_shared_from_this { * - if there's only one instance, it does nothing. */ static void checkInstanceStatus(std::shared_ptr const& self, - callback_t<> callback_after_finish) { - self->RequestToPersist( - [self](const Status& status, const json& tree, std::vector& ops) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - ops.emplace_back(op_t::Put("/instances/" + - self->server_ptr_->instance_name() + - "/timestamp", - GetTimestamp())); - return status; - } else { - LOG(ERROR) << status.ToString(); - return status; - } - }, - [self, callback_after_finish](const Status& status) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (!status.ok()) { - LOG(ERROR) << "Failed to refresh self: " << status.ToString(); - return callback_after_finish(status); - } - auto the_next = self->instances_list_.upper_bound( - self->server_ptr_->instance_id()); - if (the_next == self->instances_list_.end()) { - the_next = self->instances_list_.begin(); - } - InstanceID target_inst = *the_next; - if (target_inst == self->server_ptr_->instance_id()) { - return callback_after_finish(status); - } - VLOG(10) << "Instance size " << self->instances_list_.size() - << ", target instance is " << target_inst; - auto target = - self->meta_["instances"]["i" + std::to_string(target_inst)]; - // The subtree might be empty, when the etcd been resumed with another - // data directory but the same endpoint. that leads to a crash here - // but we just let it crash to help us diagnosis the error. - if (!target.is_null() /* && !target.empty() */) { - int64_t ts = target["timestamp"].get(); - if (ts == self->target_latest_time_) { - ++self->timeout_count_; - } else { - self->timeout_count_ = 0; - } - self->target_latest_time_ = ts; - if (self->timeout_count_ >= MAX_TIMEOUT_COUNT) { - LOG(ERROR) << "Instance " << target_inst << " timeout"; - self->timeout_count_ = 0; - self->target_latest_time_ = 0; - self->RequestToPersist( - [self, target_inst](const Status& status, const json& tree, - std::vector& ops) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - std::string key = - "/instances/i" + std::to_string(target_inst); - ops.emplace_back(op_t::Del(key + "/hostid")); - ops.emplace_back(op_t::Del(key + "/timestamp")); - ops.emplace_back(op_t::Del(key + "/hostname")); - ops.emplace_back(op_t::Del(key + "/nodename")); - ops.emplace_back(op_t::Del(key + "/rpc_endpoint")); - ops.emplace_back(op_t::Del(key + "/ipc_socket")); - } else { - LOG(ERROR) << status.ToString(); - } - return status; - }, - [self, callback_after_finish](const Status& status) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - return callback_after_finish(status); - }); - VINEYARD_SUPPRESS( - self->server_ptr_->DeleteAllAt(self->meta_, target_inst)); - return status; - } else { - return callback_after_finish(status); - } - } else { - return callback_after_finish(status); - } - }); - } + callback_t<> callback_after_finish); static Status startHeartbeat(std::shared_ptr const& self, - Status const&) { - self->heartbeat_timer_.reset( - new asio::steady_timer(self->server_ptr_->GetMetaContext(), - std::chrono::seconds(HEARTBEAT_TIME))); - self->heartbeat_timer_->async_wait( - [self](const boost::system::error_code& error) { - if (self->stopped_.load()) { - return; - } - if (error) { - LOG(ERROR) << "heartbeat timer error: " << error << ", " - << error.message(); - } - if (!error || error != boost::system::errc::operation_canceled) { - // run check, and start the next round in the finish callback. - checkInstanceStatus( - self, boost::bind(&IMetaService::startHeartbeat, self, _1)); - } - }); - return Status::OK(); - } + Status const&); protected: // invoke when everything is ready (after Start() and ready for invoking) @@ -594,40 +160,7 @@ class IMetaService : public std::enable_shared_from_this { callback_t callback_after_updated) = 0; void requestValues(const std::string& prefix, - callback_t callback) { - // We still need to run a `etcdctl get` for the first time. With a - // long-running and no compact Etcd, watching from revision 0 may - // lead to a super huge amount of events, which is unacceptable. - auto self(shared_from_this()); - if (rev_ == 0) { - requestAll(prefix, rev_, - [self, callback](const Status& status, - const std::vector& ops, unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - self->metaUpdate(ops, true); - self->rev_ = rev; - } - return callback(status, self->meta_, self->rev_); - }); - } else { - requestUpdates( - prefix, rev_, - [self, callback](const Status& status, const std::vector& ops, - unsigned rev) { - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - if (status.ok()) { - self->metaUpdate(ops, true); - self->rev_ = rev; - } - return callback(status, self->meta_, self->rev_); - }); - } - } + callback_t callback); virtual void requestLock( std::string lock_name, @@ -685,215 +218,14 @@ class IMetaService : public std::enable_shared_from_this { void delVal(ObjectID const& target, std::set& blobs); template - void metaUpdate(const RangeT& ops, bool const from_remote) { - std::set blobs_to_delete; - - std::vector add_sigs, drop_sigs; - std::vector add_objects, drop_objects; - std::vector add_others, drop_others; - - // group-by all changes - for (const op_t& op : ops) { - if (op.kv.rev != 0 && op.kv.rev <= rev_) { -#ifndef NDEBUG - if (from_remote && op.kv.rev <= rev_) { - LOG(WARNING) << "skip updates: " << op.ToString(); - } -#endif - // revision resolution: means this revision has already been updated - // the revision value 0 means local update ops. - continue; - } - if (boost::algorithm::trim_copy(op.kv.key).empty()) { - // skip unprintable keys - continue; - } - if (boost::algorithm::starts_with(op.kv.key, meta_sync_lock_)) { - // skip the update of etcd lock - continue; - } - - // update instance status - if (boost::algorithm::starts_with(op.kv.key, "/instances/")) { - instanceUpdate(op, from_remote); - } - -#ifndef NDEBUG - if (from_remote) { - VLOG(11) << "update op in meta tree: " << op.ToString(); - } -#endif - - if (boost::algorithm::starts_with(op.kv.key, "/signatures/")) { - if (op.op == op_t::op_type_t::kPut) { - add_sigs.emplace_back(op); - } else if (op.op == op_t::op_type_t::kDel) { - drop_sigs.emplace_back(op); - } else { - LOG(ERROR) << "warn: unknown op type for signatures: " << op.op; - } - } else if (boost::algorithm::starts_with(op.kv.key, "/data/")) { - if (op.op == op_t::op_type_t::kPut) { - add_objects.emplace_back(op); - } else if (op.op == op_t::op_type_t::kDel) { - drop_objects.emplace_back(op); - } else { - LOG(ERROR) << "warn: unknown op type for objects: " << op.op; - } - } else { - if (op.op == op_t::op_type_t::kPut) { - add_others.emplace_back(op); - } else if (op.op == op_t::op_type_t::kDel) { - drop_others.emplace_back(op); - } else { - LOG(ERROR) << "warn: unknown op type for others: " << op.op; - } - } - } - - // apply adding signature mappings first. - for (const op_t& op : add_sigs) { - putVal(op.kv, from_remote); - } - - // apply adding others - for (const op_t& op : add_others) { - putVal(op.kv, from_remote); - } - - // apply adding objects - for (const op_t& op : add_objects) { - putVal(op.kv, from_remote); - } - - // apply drop objects - { - // 1. collect all ids - std::set initial_delete_set; - std::vector vs; - for (const op_t& op : drop_objects) { - vs.clear(); - boost::algorithm::split(vs, op.kv.key, - [](const char c) { return c == '/'; }); - if (vs[0].empty()) { - vs.erase(vs.begin()); - } - // `__name` is our injected properties, and will be erased during - // `DropName`. - if (vs.size() >= 3 && vs[2] == "__name") { - // move the key to `drop_others` to drop - drop_others.emplace_back(op); - } else { - initial_delete_set.emplace(ObjectIDFromString(vs[1])); - } - } - std::vector object_ids{initial_delete_set.begin(), - initial_delete_set.end()}; - - // 2. traverse to find the delete set - std::vector processed_delete_set; - findDeleteSet(object_ids, processed_delete_set, false, false); - -#ifndef NDEBUG - if (VLOG_IS_ON(10)) { - for (auto const& item : processed_delete_set) { - VLOG(10) << "deleting object (in meta update): " - << ObjectIDToString(item); - } - } -#endif - - // 3. execute delete for every object - for (auto const target : processed_delete_set) { - delVal(target, blobs_to_delete); - } - } - - // apply drop others - for (const op_t& op : drop_others) { - delVal(op.kv); - } - - // apply drop signatures - for (const op_t& op : drop_sigs) { - delVal(op.kv); - } - -#ifndef NDEBUG - // debugging - printDepsGraph(); - for (auto const& id : blobs_to_delete) { - LOG(INFO) << "blob to delete: " << ObjectIDToString(id); - } -#endif - - VINEYARD_SUPPRESS(server_ptr_->DeleteBlobBatch(blobs_to_delete)); - VINEYARD_SUPPRESS(server_ptr_->ProcessDeferred(meta_)); - } + void metaUpdate(const RangeT& ops, bool const from_remote); - void instanceUpdate(const op_t& op, const bool from_remote = true) { - std::vector key_segments; - boost::split(key_segments, op.kv.key, boost::is_any_of("/")); - if (key_segments[0].empty()) { - key_segments.erase(key_segments.begin()); - } - if (key_segments[2] == "hostid") { - uint64_t instance_id = std::stoul(key_segments[1].substr(1)); - if (op.op == op_t::op_type_t::kPut) { - if (from_remote) { - LOG(INFO) << "Instance join: " << instance_id; - } - instances_list_.emplace(instance_id); - } else if (op.op == op_t::op_type_t::kDel) { - if (from_remote) { - LOG(INFO) << "Instance exit: " << instance_id; - } - instances_list_.erase(instance_id); - } else { - if (from_remote) { - LOG(ERROR) << "Unknown op type: " << op.ToString(); - } - } - LOG_SUMMARY("instances_total", "", instances_list_.size()); - } - } + void instanceUpdate(const op_t& op, const bool from_remote = true); static Status daemonWatchHandler(std::shared_ptr self, const Status& status, const std::vector& ops, unsigned rev, - callback_t callback_after_update) { - // `this` must be non-stopped in this handler, as the {Etcd}WatchHandler - // keeps a reference of `this` (std::shared_ptr). - if (self->stopped_.load()) { - return Status::AlreadyStopped("etcd metadata service"); - } - // Guarantee: all kvs inside a txn reaches the client at the same time, - // which is guaranteed by the implementation of etcd. - // - // That means, every time this handler is called, we just need to response - // for one type of change. - if (!status.ok()) { - LOG(ERROR) << "Error in daemon watching: " << status.ToString(); - return callback_after_update(status, rev); - } - if (ops.empty()) { - return callback_after_update(Status::OK(), rev); - } - // process events grouped by revision - size_t idx = 0; - std::vector op_batch; - while (idx < ops.size()) { - unsigned head_index = ops[idx].kv.rev; - while (idx < ops.size() && ops[idx].kv.rev == head_index) { - op_batch.emplace_back(ops[idx]); - idx += 1; - } - self->metaUpdate(op_batch, true); - op_batch.clear(); - self->rev_ = head_index; - } - return callback_after_update(Status::OK(), rev); - } + callback_t callback_after_update); std::unique_ptr heartbeat_timer_; std::set instances_list_;