Skip to content

Commit

Permalink
Implement getInfo in FsdbSubscriber, use in pub sub mgr
Browse files Browse the repository at this point in the history
Summary: I want this logic shared between FsdbPubSubManager and FsdbSubManager. It makes sense for FsdbSubscriber to expose this info. I had to introduce an untemplated SubscriberBase with a virtual method to get this working however

Differential Revision:
D58571194

Privacy Context Container: L1125642

fbshipit-source-id: 03a41e437bdcf60fa38ca979df56c75e31cb5a03
  • Loading branch information
Peyman Gardideh authored and facebook-github-bot committed Aug 15, 2024
1 parent a7e9fa2 commit 8375188
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 55 deletions.
5 changes: 2 additions & 3 deletions fboss/agent/DsfSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ class DsfSubscriber : public StateObserver {
return cachedState_;
}

const std::vector<fsdb::FsdbPubSubManager::SubscriptionInfo>
getSubscriptionInfo() const {
std::vector<fsdb::FsdbPubSubManager::SubscriptionInfo> infos;
const std::vector<fsdb::SubscriptionInfo> getSubscriptionInfo() const {
std::vector<fsdb::SubscriptionInfo> infos;
auto subscriptionsLocked = subscriptions_.rlock();
infos.reserve(subscriptionsLocked->size());
for (const auto& [_, subscription] : *subscriptionsLocked) {
Expand Down
3 changes: 1 addition & 2 deletions fboss/agent/DsfSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ fsdb::FsdbStreamClient::State DsfSubscription::getStreamState() const {
getAllSubscribePaths(localNodeName_, localIp_), remoteIp_.str());
}

const fsdb::FsdbPubSubManager::SubscriptionInfo
DsfSubscription::getSubscriptionInfo() const {
const fsdb::SubscriptionInfo DsfSubscription::getSubscriptionInfo() const {
// Since we own our own pub sub mgr, there should always be exactly one
// subscription
return fsdbPubSubMgr_->getSubscriptionInfo()[0];
Expand Down
2 changes: 1 addition & 1 deletion fboss/agent/DsfSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DsfSubscription {
return session_.toThrift();
}

const fsdb::FsdbPubSubManager::SubscriptionInfo getSubscriptionInfo() const;
const fsdb::SubscriptionInfo getSubscriptionInfo() const;
// Used for tests only
const std::shared_ptr<SwitchState> cachedState() const {
return cachedState_;
Expand Down
41 changes: 6 additions & 35 deletions fboss/fsdb/client/FsdbPubSubManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,6 @@ std::string toSubscriptionStr(
":/",
PathHelpers::toString(paths));
}

std::tuple<std::string, std::string, std::string, std::vector<std::string>>
parseSubscriptionStr(const std::string& subStr) {
std::vector<std::string> elements;
std::vector<std::string> paths;
folly::split(":/", subStr, elements);
// Server, Delta/Path, State/Stat, Paths
CHECK_EQ(elements.size(), 4);
const auto& pathStr = elements[elements.size() - 1];
if (!pathStr.empty()) {
folly::split('_', pathStr, paths);
}
return std::make_tuple(elements[0], elements[1], elements[2], paths);
}

} // namespace
namespace facebook::fboss::fsdb {

Expand Down Expand Up @@ -566,30 +551,16 @@ void FsdbPubSubManager::addSubscriptionImpl(
itr->second->setServerOptions(std::move(serverOptions));
}

const std::vector<FsdbPubSubManager::SubscriptionInfo>
FsdbPubSubManager::getSubscriptionInfo() const {
const std::vector<SubscriptionInfo> FsdbPubSubManager::getSubscriptionInfo()
const {
std::vector<SubscriptionInfo> subscriptionInfo;
auto statePath2SubscriberR = statePath2Subscriber_.rlock();
for (const auto& [subStr, streamClient] : *statePath2SubscriberR) {
const auto& [server, delta, stats, paths] = parseSubscriptionStr(subStr);
subscriptionInfo.push_back(
{server,
delta == kDelta,
stats == kStats,
paths,
streamClient->getState(),
streamClient->getDisconnectReason()});
for (const auto& [_, subscriber] : *statePath2SubscriberR) {
subscriptionInfo.push_back(subscriber->getInfo());
}
auto statPath2SubscriberR = statPath2Subscriber_.rlock();
for (const auto& [subStr, streamClient] : *statPath2SubscriberR) {
const auto& [server, delta, stats, paths] = parseSubscriptionStr(subStr);
subscriptionInfo.push_back(
{server,
delta == kDelta,
stats == kStats,
paths,
streamClient->getState(),
streamClient->getDisconnectReason()});
for (const auto& [_, subscriber] : *statPath2SubscriberR) {
subscriptionInfo.push_back(subscriber->getInfo());
}
return subscriptionInfo;
}
Expand Down
13 changes: 2 additions & 11 deletions fboss/fsdb/client/FsdbPubSubManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ class FsdbPubSubManager {
using Path = std::vector<std::string>;
using MultiPath = std::vector<Path>;

struct SubscriptionInfo {
std::string server;
bool isDelta;
bool isStats;
std::vector<std::string> paths;
FsdbStreamClient::State state;
FsdbErrorCode disconnectReason;
};

/* Publisher create APIs */
void createStateDeltaPublisher(
const Path& publishPath,
Expand Down Expand Up @@ -301,10 +292,10 @@ class FsdbPubSubManager {
std::unique_ptr<FsdbPatchPublisher> statPatchPublisher_;
// Subscribers
folly::Synchronized<
std::unordered_map<std::string, std::unique_ptr<FsdbStreamClient>>>
std::unordered_map<std::string, std::unique_ptr<FsdbSubscriberBase>>>
statePath2Subscriber_;
folly::Synchronized<
std::unordered_map<std::string, std::unique_ptr<FsdbStreamClient>>>
std::unordered_map<std::string, std::unique_ptr<FsdbSubscriberBase>>>
statPath2Subscriber_;

// per class placeholder for test code injection
Expand Down
7 changes: 7 additions & 0 deletions fboss/fsdb/client/FsdbSubManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ class FsdbSubManager {
return opts_.clientId_;
}

std::optional<SubscriptionInfo> getInfo() {
if (subscriber_) {
return subscriber_->getInfo();
}
return std::nullopt;
}

private:
void parseChunkAndInvokeCallback(SubscriberChunk chunk, DataCallback cb) {
std::vector<SubscriptionKey> changedKeys;
Expand Down
1 change: 0 additions & 1 deletion fboss/fsdb/client/FsdbSubscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2004-present Facebook. All Rights Reserved.

#include "fboss/fsdb/client/FsdbDeltaSubscriber.h"
#include "fboss/fsdb/common/PathHelpers.h"

namespace facebook::fboss::fsdb {

Expand Down
32 changes: 30 additions & 2 deletions fboss/fsdb/client/FsdbSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#pragma once

#include "fboss/fsdb/client/FsdbStreamClient.h"
#include "fboss/fsdb/common/PathHelpers.h"
#include "fboss/fsdb/if/gen-cpp2/fsdb_common_types.h"
#include "fboss/fsdb/if/gen-cpp2/fsdb_oper_types.h"

#include <folly/Format.h>
Expand Down Expand Up @@ -77,8 +79,24 @@ struct SubscriptionOptions {
uint32_t grHoldTimeSec_{0};
};

struct SubscriptionInfo {
std::string server;
bool isDelta;
bool isStats;
std::vector<std::string> paths;
FsdbStreamClient::State state;
FsdbErrorCode disconnectReason;
};

class FsdbSubscriberBase : public FsdbStreamClient {
public:
using FsdbStreamClient::FsdbStreamClient;

virtual SubscriptionInfo getInfo() const = 0;
};

template <typename SubUnit, typename Paths>
class FsdbSubscriber : public FsdbStreamClient {
class FsdbSubscriber : public FsdbSubscriberBase {
std::string typeStr() const;
std::string pathsStr(const Paths& path) const;

Expand Down Expand Up @@ -115,7 +133,7 @@ class FsdbSubscriber : public FsdbStreamClient {
std::optional<SubscriptionStateChangeCb> stateChangeCb = std::nullopt,
std::optional<FsdbStreamStateChangeCb> connectionStateChangeCb =
std::nullopt)
: FsdbStreamClient(
: FsdbSubscriberBase(
options.clientId_,
streamEvb,
connRetryEvb,
Expand Down Expand Up @@ -149,6 +167,16 @@ class FsdbSubscriber : public FsdbStreamClient {
cancelStaleStateTimeout();
}

SubscriptionInfo getInfo() const override {
return SubscriptionInfo{
getServer(),
!std::is_same_v<SubUnit, OperState>,
this->isStats(),
PathHelpers::toStringList(subscribePaths_),
getState(),
getDisconnectReason()};
}

protected:
auto createRequest() const {
if constexpr (std::is_same_v<Paths, std::vector<std::string>>) {
Expand Down
8 changes: 8 additions & 0 deletions fboss/lib/CommonThriftUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ class ReconnectingThriftClient {
void setServerOptions(
ServerOptions&& options,
bool allowReset = false /* allow reset for use in tests*/);

std::string getServer() const {
if (auto serverOptions = serverOptions_.rlock();
serverOptions->has_value()) {
return (*serverOptions)->dstAddr.getAddressStr();
}
return "";
}
void cancel();
void timeoutExpired() noexcept;
virtual void resetClient() = 0;
Expand Down

0 comments on commit 8375188

Please sign in to comment.