Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rsync #13

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion slash/include/base_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,40 @@

namespace slash {

class BaseConf;


class BaseConf {
public:
struct Rep {
std::string path;
enum ConfType {
kConf = 0,
kComment = 1,
};

struct ConfItem {
ConfType type; // 0 means conf, 1 means comment
std::string name;
std::string value;
ConfItem(ConfType t, const std::string &v) :
type(t),
name(""),
value(v)
{}
ConfItem(ConfType t, const std::string &n, const std::string &v) :
type(t),
name(n),
value(v)
{}
};

explicit Rep(const std::string &p)
: path(p) {
}
std::vector<ConfItem> item;
};

explicit BaseConf(const std::string &path);
virtual ~BaseConf();

Expand All @@ -39,12 +71,15 @@ class BaseConf {
bool SetConfBool(const std::string &name, const bool value);
bool SetConfStrVec(const std::string &name, const std::vector<std::string> &value);

bool CheckConfExist(const std::string &name) const;

void DumpConf() const;
bool WriteBack();
void WriteSampleConf() const;

void PushConfItem(const Rep::ConfItem& item);

private:
struct Rep;
Rep* rep_;

/*
Expand Down
60 changes: 60 additions & 0 deletions slash/include/lock_mgr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2017-present The blackwidow Authors. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef SRC_LOCK_MGR_H_
#define SRC_LOCK_MGR_H_

#include <string>
#include <memory>

#include "slash/include/mutex.h"

namespace slash {

namespace lock {
struct LockMap;
struct LockMapStripe;

class LockMgr {
public:
LockMgr(size_t default_num_stripes, int64_t max_num_locks,
std::shared_ptr<MutexFactory> factory);

~LockMgr();

// Attempt to lock key. If OK status is returned, the caller is responsible
// for calling UnLock() on this key.
Status TryLock(const std::string& key);

// Unlock a key locked by TryLock().
void UnLock(const std::string& key);

private:
// Default number of lock map stripes
const size_t default_num_stripes_;

// Limit on number of keys locked per column family
const int64_t max_num_locks_;

// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<MutexFactory> mutex_factory_;

// Map to locked key info
std::shared_ptr<LockMap> lock_map_;

Status Acquire(LockMapStripe* stripe, const std::string& key);

Status AcquireLocked(LockMapStripe* stripe, const std::string& key);

void UnLockKey(const std::string& key, LockMapStripe* stripe);

// No copying allowed
LockMgr(const LockMgr&);
void operator=(const LockMgr&);
};

} // namespace lock
} // namespace slash
#endif // SRC_LOCK_MGR_H_
89 changes: 89 additions & 0 deletions slash/include/mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2017-present The blackwidow Authors. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef SRC_MUTEX_H_
#define SRC_MUTEX_H_

#include <memory>

#include "slash/include/slash_status.h"

namespace slash {

namespace lock {

using Status = slash::Status;

class Mutex {
public:
virtual ~Mutex() {}

// Attempt to acquire lock. Return OK on success, or other Status on failure.
// If returned status is OK, BlackWidow will eventually call UnLock().
virtual Status Lock() = 0;

// Attempt to acquire lock. If timeout is non-negative, operation may be
// failed after this many microseconds.
// Returns OK on success,
// TimedOut if timed out,
// or other Status on failure.
// If returned status is OK, BlackWidow will eventually call UnLock().
virtual Status TryLockFor(int64_t timeout_time) = 0;

// Unlock Mutex that was successfully locked by Lock() or TryLockUntil()
virtual void UnLock() = 0;
};

class CondVar {
public:
virtual ~CondVar() {}

// Block current thread until condition variable is notified by a call to
// Notify() or NotifyAll(). Wait() will be called with mutex locked.
// Returns OK if notified.
// Returns non-OK if BlackWidow should stop waiting and fail the operation.
// May return OK spuriously even if not notified.
virtual Status Wait(std::shared_ptr<Mutex> mutex) = 0;

// Block current thread until condition variable is notified by a call to
// Notify() or NotifyAll(), or if the timeout is reached.
// Wait() will be called with mutex locked.
//
// If timeout is non-negative, operation should be failed after this many
// microseconds.
// If implementing a custom version of this class, the implementation may
// choose to ignore the timeout.
//
// Returns OK if notified.
// Returns TimedOut if timeout is reached.
// Returns other status if BlackWidow should otherwis stop waiting and
// fail the operation.
// May return OK spuriously even if not notified.
virtual Status WaitFor(std::shared_ptr<Mutex> mutex,
int64_t timeout_time) = 0;

// If any threads are waiting on *this, unblock at least one of the
// waiting threads.
virtual void Notify() = 0;

// Unblocks all threads waiting on *this.
virtual void NotifyAll() = 0;
};

// Factory class that can allocate mutexes and condition variables.
class MutexFactory {
public:
// Create a Mutex object.
virtual std::shared_ptr<Mutex> AllocateMutex() = 0;

// Create a CondVar object.
virtual std::shared_ptr<CondVar> AllocateCondVar() = 0;

virtual ~MutexFactory() {}
};

} // namespace lock
} // namespace slash
#endif // SRC_MUTEX_H_
23 changes: 23 additions & 0 deletions slash/include/mutex_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2017-present The blackwidow Authors. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef SRC_MUTEX_IMPL_H_
#define SRC_MUTEX_IMPL_H_

#include "slash/include/mutex.h"

#include <memory>

namespace slash {
namespace lock {
// Default implementation of MutexFactory.
class MutexFactoryImpl : public MutexFactory {
public:
std::shared_ptr<Mutex> AllocateMutex() override;
std::shared_ptr<CondVar> AllocateCondVar() override;
};
} // namespace lock
} // namespace slash
#endif // SRC_MUTEX_IMPL_H_
8 changes: 5 additions & 3 deletions slash/include/rsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
#include <string>

namespace slash {
const std::string kRsyncSecretFile = "slash_rsync.secret";
const std::string kRsyncConfFile = "slash_rsync.conf";
const std::string kRsyncLogFile = "slash_rsync.log";
const std::string kRsyncPidFile = "slash_rsync.pid";
const std::string kRsyncLockFile = "slash_rsync.lock";
const std::string kRsyncSubDir = "rsync";
const std::string kRsyncUser = "rsync_user";
struct RsyncRemote {
std::string host;
int port;
Expand All @@ -19,12 +21,12 @@ struct RsyncRemote {
: host(_host), port(_port), module(_module), kbps(_kbps) {}
};

int StartRsync(const std::string& rsync_path, const std::string& module, const std::string& ip, const int port);
int StartRsync(const std::string& rsync_path, const std::string& module, const std::string& ip, const int port, const std::string& passwd);
int StopRsync(const std::string& path);
int RsyncSendFile(const std::string& local_file_path, const std::string& remote_file_path,
const RsyncRemote& remote);
const std::string& secret_file_path, const RsyncRemote& remote);
int RsyncSendClearTarget(const std::string& local_dir_path, const std::string& remote_dir_path,
const RsyncRemote& remote);
const std::string& secret_file_path, const RsyncRemote& remote);

}
#endif
66 changes: 66 additions & 0 deletions slash/include/scope_record_lock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2017-present The blackwidow Authors. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef SRC_SCOPE_RECORD_LOCK_H_
#define SRC_SCOPE_RECORD_LOCK_H_

#include <vector>
#include <string>
#include <algorithm>

#include "slash/include/lock_mgr.h"

namespace slash {

namespace lock {

class ScopeRecordLock {
public:
ScopeRecordLock(LockMgr* lock_mgr, const Slice& key) :
lock_mgr_(lock_mgr), key_(key) {
lock_mgr_->TryLock(key_.ToString());
}
~ScopeRecordLock() {
lock_mgr_->UnLock(key_.ToString());
}

private:
LockMgr* const lock_mgr_;
Slice key_;
ScopeRecordLock(const ScopeRecordLock&);
void operator=(const ScopeRecordLock&);
};

class MultiScopeRecordLock {
public:
MultiScopeRecordLock(LockMgr* lock_mgr,
const std::vector<std::string>& keys);
~MultiScopeRecordLock();

private:
LockMgr* const lock_mgr_;
std::vector<std::string> keys_;
MultiScopeRecordLock(const MultiScopeRecordLock&);
void operator=(const MultiScopeRecordLock&);
};

class MultiRecordLock {
public:
explicit MultiRecordLock(LockMgr* lock_mgr) : lock_mgr_(lock_mgr) {
}
~MultiRecordLock() {
}
void Lock(const std::vector<std::string>& keys);
void Unlock(const std::vector<std::string>& keys);

private:
LockMgr* const lock_mgr_;
MultiRecordLock(const MultiRecordLock&);
void operator=(const MultiRecordLock&);
};

} // namespace lock
} // namespace slash
#endif // SRC_SCOPE_RECORD_LOCK_H_
15 changes: 15 additions & 0 deletions slash/include/slash_coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
namespace slash {

// Standard Put... routines append to a string
extern void PutFixed16(std::string* dst, uint16_t value);
extern void PutFixed32(std::string* dst, uint32_t value);
extern void PutFixed64(std::string* dst, uint64_t value);
extern void PutVarint32(std::string* dst, uint32_t value);
extern void PutVarint64(std::string* dst, uint64_t value);
extern void PutLengthPrefixedString(std::string* dst, const std::string& value);

extern void GetFixed16(std::string* dst, uint16_t* value);
extern void GetFixed32(std::string* dst, uint32_t* value);
extern void GetFixed64(std::string* dst, uint64_t* value);
extern bool GetVarint32(std::string* input, uint32_t* value);
Expand All @@ -46,6 +48,7 @@ extern int VarintLength(uint64_t v);

// Lower-level versions of Put... that write directly into a character buffer
// REQUIRES: dst has enough space for the value being written
extern void EncodeFixed16(char* dst, uint16_t value);
extern void EncodeFixed32(char* dst, uint32_t value);
extern void EncodeFixed64(char* dst, uint64_t value);

Expand All @@ -58,6 +61,13 @@ extern char* EncodeVarint64(char* dst, uint64_t value);
// Lower-level versions of Get... that read directly from a character buffer
// without any bounds checking.

inline uint16_t DecodeFixed16(const char* ptr) {
// Load the raw bytes
uint16_t result;
memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
return result;
}

inline uint32_t DecodeFixed32(const char* ptr) {
// Load the raw bytes
uint32_t result;
Expand All @@ -72,6 +82,11 @@ inline uint64_t DecodeFixed64(const char* ptr) {
return result;
}

inline void GetFixed16(std::string* dst, uint16_t *value) {
*value = DecodeFixed16(dst->data());
dst->erase(0, sizeof(uint16_t));
}

inline void GetFixed32(std::string* dst, uint32_t *value) {
*value = DecodeFixed32(dst->data());
dst->erase(0, sizeof(uint32_t));
Expand Down
Loading