Skip to content

Commit

Permalink
Support custom WorkerThread in DispathThread
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Jun 12, 2017
1 parent 2908532 commit d44d3d9
Show file tree
Hide file tree
Showing 15 changed files with 213 additions and 19 deletions.
6 changes: 5 additions & 1 deletion pink/example/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ else
CXXFLAGS = -O2 -pipe -fPIC -W -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -Wno-unused-variable -Wno-maybe-uninitialized -Wno-unused-parameter
# CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__
endif
OBJECT = holy_srv holy_srv_chandle dispatch_srv myproto_cli http_server bg_thread simple_http_server
OBJECT = holy_srv holy_srv_chandle dispatch_srv dispatch_srv2 myproto_cli http_server bg_thread simple_http_server

ifndef SLASH_PATH
SLASH_PATH = ../third/slash
Expand Down Expand Up @@ -52,6 +52,10 @@ holy_srv_chandle: myholy_srv_chandle.cc myproto.pb.cc

dispatch_srv: mydispatch_srv.cc myproto.pb.cc
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)

dispatch_srv2: mydispatch_srv2.cc myproto.pb.cc
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)

myproto_cli: myproto_cli.cc myproto.pb.cc
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)

Expand Down
3 changes: 1 addition & 2 deletions pink/example/http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
#include "slash/include/slash_status.h"
#include "slash/include/slash_hash.h"
#include "pink/include/pink_thread.h"
#include "pink/src/worker_thread.h"
#include "pink/src/dispatch_thread.h"
#include "pink/include/server_thread.h"
#include "pink/include/http_conn.h"

using namespace pink;
Expand Down
102 changes: 102 additions & 0 deletions pink/example/mydispatch_srv2.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <atomic>

#include "slash/include/xdebug.h"
#include "pink/include/pink_thread.h"
#include "pink/include/server_thread.h"
#include "pink/include/worker_thread.h"

#include "myproto.pb.h"
#include "pink/include/pb_conn.h"

#include <google/protobuf/message.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>

using namespace pink;

class MyConn: public PbConn {
public:
MyConn(int fd, std::string ip_port, Thread *thread)
: PbConn(fd, ip_port, thread) {
}
virtual ~MyConn() {
}
protected:
virtual int DealMessage();

private:
myproto::Ping ping_;
myproto::PingRes ping_res_;
};

int MyConn::DealMessage() {
printf("In the myconn DealMessage branch\n");
ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_);
ping_res_.Clear();
ping_res_.set_res(11234);
ping_res_.set_mess("heiheidfdfdf");
printf ("DealMessage receive (%s)\n", ping_res_.mess().c_str());
res_ = &ping_res_;
set_is_reply(true);
return 0;
}

class MyConnFactory : public ConnFactory {
public:
virtual PinkConn *NewPinkConn(int connfd, const std::string &ip_port, Thread *thread) const {
return new MyConn(connfd, ip_port, thread);
}
};

class MyWorker : public WorkerThread {
explicit MyWorker(ConnFactory *conn_factory, int cron_interval = 0)
: WorkerThread(conn_factory, cron_interval) {
}
};

static std::atomic<bool> running(false);

static void IntSigHandle(const int sig) {
printf("Catch Signal %d, cleanup...\n", sig);
running.store(false);
printf("server Exit");
}

static void SignalSetup() {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, &IntSigHandle);
signal(SIGQUIT, &IntSigHandle);
signal(SIGTERM, &IntSigHandle);
}

int main() {
SignalSetup();
Thread* my_worker[10];
ConnFactory *my_conn_factory = new MyConnFactory();
for (int i = 0; i < 10; i++) {
my_worker[i] = NewWorkerThread(my_conn_factory, 1000);
}
ServerThread *st = NewDispatchThread(9211, 10, my_worker, 1000);

if (st->StartThread() != 0) {
printf("StartThread error happened!\n");
exit(-1);
}
running.store(true);
while (running.load()) {
sleep(1);
}
st->StopThread();

delete st;
delete my_conn_factory;
for (int i = 0; i < 10; i++) {
delete my_worker[i];
}

return 0;
}
3 changes: 1 addition & 2 deletions pink/example/simple_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

#include "slash/include/slash_status.h"
#include "pink/include/pink_thread.h"
#include "pink/src/worker_thread.h"
#include "pink/src/dispatch_thread.h"
#include "pink/include/server_thread.h"
#include "pink/include/simple_http_conn.h"

using namespace pink;
Expand Down
1 change: 0 additions & 1 deletion pink/include/http_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "pink/include/pink_conn.h"
#include "pink/include/pink_define.h"
#include "pink/src/csapp.h"
#include "pink/src/worker_thread.h"
#include "pink/src/pink_util.h"

namespace pink {
Expand Down
20 changes: 20 additions & 0 deletions pink/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ extern ServerThread *NewDispatchThread(
const ServerHandle* handle = nullptr,
const ThreadEnvHandle* thandle = nullptr);

extern ServerThread* NewDispatchThread(
int port,
int work_num, Thread **worker_thread,
int cron_interval = 0,
const ServerHandle* handle = nullptr);
extern ServerThread* NewDispatchThread(
const std::string &ip, int port,
int work_num, Thread **worker_thread,
int cron_interval = 0,
const ServerHandle* handle = nullptr);
extern ServerThread* NewDispatchThread(
const std::set<std::string>& ips, int port,
int work_num, Thread **worker_thread,
int cron_interval = 0,
const ServerHandle* handle = nullptr);

extern Thread *NewWorkerThread(
ConnFactory *conn_factory,
int cron_interval = 0);

} // namespace pink

#endif
1 change: 0 additions & 1 deletion pink/include/simple_http_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "pink/include/pink_conn.h"
#include "pink/include/pink_define.h"
#include "pink/src/csapp.h"
#include "pink/src/worker_thread.h"
#include "pink/src/pink_util.h"

namespace pink {
Expand Down
1 change: 1 addition & 0 deletions pink/src/worker_thread.h → pink/include/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class WorkerThread : public Thread {
public:
explicit WorkerThread(ConnFactory *conn_factory, int cron_interval = 0,
const ThreadEnvHandle* thandle = nullptr);

virtual ~WorkerThread();

void set_keepalive_timeout(int timeout) {
Expand Down
74 changes: 67 additions & 7 deletions pink/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "pink/src/pink_item.h"
#include "pink/src/pink_epoll.h"
#include "pink/src/worker_thread.h"
#include "pink/include/worker_thread.h"

namespace pink {

Expand All @@ -12,7 +12,8 @@ DispatchThread::DispatchThread(int port,
const ThreadEnvHandle* ehandle)
: ServerThread::ServerThread(port, cron_interval, handle),
last_thread_(0),
work_num_(work_num) {
work_num_(work_num),
owned_worker_thread_(true) {
worker_thread_ = new WorkerThread*[work_num_];
for (int i = 0; i < work_num_; i++) {
worker_thread_[i] = new WorkerThread(conn_factory, cron_interval, ehandle);
Expand All @@ -25,7 +26,8 @@ DispatchThread::DispatchThread(const std::string &ip, int port,
const ThreadEnvHandle* ehandle)
: ServerThread::ServerThread(ip, port, cron_interval, handle),
last_thread_(0),
work_num_(work_num) {
work_num_(work_num),
owned_worker_thread_(true) {
worker_thread_ = new WorkerThread*[work_num_];
for (int i = 0; i < work_num_; i++) {
worker_thread_[i] = new WorkerThread(conn_factory, cron_interval, ehandle);
Expand All @@ -38,18 +40,51 @@ DispatchThread::DispatchThread(const std::set<std::string>& ips, int port,
const ThreadEnvHandle* ehandle)
: ServerThread::ServerThread(ips, port, cron_interval, handle),
last_thread_(0),
work_num_(work_num) {
work_num_(work_num),
owned_worker_thread_(true) {
worker_thread_ = new WorkerThread*[work_num_];
for (int i = 0; i < work_num_; i++) {
worker_thread_[i] = new WorkerThread(conn_factory, cron_interval, ehandle);
}
}

DispatchThread::DispatchThread(int port,
int work_num, Thread **worker_thread,
int cron_interval, const ServerHandle* handle)
: ServerThread::ServerThread(port, cron_interval, handle),
last_thread_(0),
work_num_(work_num),
worker_thread_(reinterpret_cast<WorkerThread **>(worker_thread)),
owned_worker_thread_(false) {
}

DispatchThread::DispatchThread(const std::string &ip, int port,
int work_num, Thread **worker_thread,
int cron_interval, const ServerHandle* handle)
: ServerThread::ServerThread(ip, port, cron_interval, handle),
last_thread_(0),
work_num_(work_num),
worker_thread_(reinterpret_cast<WorkerThread **>(worker_thread)),
owned_worker_thread_(false) {
}

DispatchThread::DispatchThread(const std::set<std::string>& ips, int port,
int work_num, Thread **worker_thread,
int cron_interval, const ServerHandle* handle)
: ServerThread::ServerThread(ips, port, cron_interval, handle),
last_thread_(0),
work_num_(work_num),
worker_thread_(reinterpret_cast<WorkerThread **>(worker_thread)),
owned_worker_thread_(false) {
}

DispatchThread::~DispatchThread() {
for (int i = 0; i < work_num_; i++) {
delete worker_thread_[i];
if (owned_worker_thread_) {
for (int i = 0; i < work_num_; i++) {
delete worker_thread_[i];
}
delete[] worker_thread_;
}
delete worker_thread_;
}

int DispatchThread::StartThread() {
Expand Down Expand Up @@ -126,4 +161,29 @@ extern ServerThread *NewDispatchThread(
cron_interval, handle, ehandle);
}

extern ServerThread* NewDispatchThread(
int port,
int work_num, Thread **worker_thread,
int cron_interval,
const ServerHandle* handle) {
return new DispatchThread(port, work_num, worker_thread,
cron_interval, handle);
}
extern ServerThread* NewDispatchThread(
const std::string &ip, int port,
int work_num, Thread **worker_thread,
int cron_interval,
const ServerHandle* handle) {
return new DispatchThread(ip, port, work_num, worker_thread,
cron_interval, handle);
}
extern ServerThread* NewDispatchThread(
const std::set<std::string>& ips, int port,
int work_num, Thread **worker_thread,
int cron_interval,
const ServerHandle* handle) {
return new DispatchThread(ips, port, work_num, worker_thread,
cron_interval, handle);
}

}; // namespace pink
11 changes: 11 additions & 0 deletions pink/src/dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ class DispatchThread : public ServerThread {
int cron_interval = 0, const ServerHandle* handle = nullptr,
const ThreadEnvHandle* ehandle = nullptr);

DispatchThread(int port,
int work_num, Thread **worker_thread,
int cron_interval = 0, const ServerHandle* handle = nullptr);
DispatchThread(const std::string &ip, int port,
int work_num, Thread **worker_thread,
int cron_interval = 0, const ServerHandle* handle = nullptr);
DispatchThread(const std::set<std::string>& ips, int port,
int work_num, Thread **worker_thread,
int cron_interval = 0, const ServerHandle* handle = nullptr);

virtual ~DispatchThread();

virtual int StartThread() override;
Expand All @@ -57,6 +67,7 @@ class DispatchThread : public ServerThread {
* This is the work threads
*/
WorkerThread** worker_thread_;
bool owned_worker_thread_;

void HandleNewConn(const int connfd, const std::string& ip_port) override;
void HandleConnEvent(PinkFiredEvent *pfe) override {}
Expand Down
1 change: 0 additions & 1 deletion pink/src/http_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "slash/include/xdebug.h"
#include "slash/include/slash_string.h"
#include "pink/include/pink_define.h"
#include "pink/src/worker_thread.h"

namespace pink {

Expand Down
1 change: 0 additions & 1 deletion pink/src/pb_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include "slash/include/xdebug.h"
#include "pink/include/pink_define.h"
#include "pink/src/worker_thread.h"
#include "pink/src/csapp.h"

namespace pink {
Expand Down
1 change: 0 additions & 1 deletion pink/src/redis_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "slash/include/xdebug.h"
#include "pink/include/pink_define.h"
#include "pink/include/redis_conn.h"
#include "pink/src/worker_thread.h"

namespace pink {

Expand Down
1 change: 0 additions & 1 deletion pink/src/simple_http_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "slash/include/xdebug.h"
#include "slash/include/slash_string.h"
#include "pink/include/pink_define.h"
#include "pink/src/worker_thread.h"

namespace pink {

Expand Down
6 changes: 5 additions & 1 deletion pink/src/worker_thread.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "worker_thread.h"
#include "pink/include/worker_thread.h"

#include "pink/include/pink_conn.h"
#include "pink/src/pink_item.h"
Expand Down Expand Up @@ -173,4 +173,8 @@ void WorkerThread::Cleanup() {
conns_.clear();
}

Thread *NewWorkerThread(ConnFactory *conn_factory, int cron_interval) {
return new WorkerThread(conn_factory, cron_interval);
}

}; // namespace pink

0 comments on commit d44d3d9

Please sign in to comment.