diff --git a/pink/example/Makefile b/pink/example/Makefile index d5f0be21..887535c2 100644 --- a/pink/example/Makefile +++ b/pink/example/Makefile @@ -5,7 +5,7 @@ else CXXFLAGS = -O2 -pipe -fPIC -W -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -Wno-unused-variable -Wno-maybe-uninitialized -Wno-unused-parameter # CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__ endif -OBJECT = holy_srv holy_srv_chandle dispatch_srv myproto_cli http_server bg_thread simple_http_server +OBJECT = holy_srv holy_srv_chandle dispatch_srv dispatch_srv2 myproto_cli http_server bg_thread simple_http_server ifndef SLASH_PATH SLASH_PATH = ../third/slash @@ -52,6 +52,10 @@ holy_srv_chandle: myholy_srv_chandle.cc myproto.pb.cc dispatch_srv: mydispatch_srv.cc myproto.pb.cc $(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS) + +dispatch_srv2: mydispatch_srv2.cc myproto.pb.cc + $(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS) + myproto_cli: myproto_cli.cc myproto.pb.cc $(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS) diff --git a/pink/example/http_server.cc b/pink/example/http_server.cc index 858f05ff..deff977a 100644 --- a/pink/example/http_server.cc +++ b/pink/example/http_server.cc @@ -11,8 +11,7 @@ #include "slash/include/slash_status.h" #include "slash/include/slash_hash.h" #include "pink/include/pink_thread.h" -#include "pink/src/worker_thread.h" -#include "pink/src/dispatch_thread.h" +#include "pink/include/server_thread.h" #include "pink/include/http_conn.h" using namespace pink; diff --git a/pink/example/mydispatch_srv2.cc b/pink/example/mydispatch_srv2.cc new file mode 100644 index 00000000..02f43c12 --- /dev/null +++ b/pink/example/mydispatch_srv2.cc @@ -0,0 +1,102 @@ +#include +#include +#include +#include + +#include "slash/include/xdebug.h" +#include "pink/include/pink_thread.h" +#include "pink/include/server_thread.h" +#include "pink/include/worker_thread.h" + +#include "myproto.pb.h" +#include "pink/include/pb_conn.h" + +#include +#include +#include + +using namespace pink; + +class MyConn: public PbConn { + public: + MyConn(int fd, std::string ip_port, Thread *thread) + : PbConn(fd, ip_port, thread) { + } + virtual ~MyConn() { + } + protected: + virtual int DealMessage(); + + private: + myproto::Ping ping_; + myproto::PingRes ping_res_; +}; + +int MyConn::DealMessage() { + printf("In the myconn DealMessage branch\n"); + ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + ping_res_.Clear(); + ping_res_.set_res(11234); + ping_res_.set_mess("heiheidfdfdf"); + printf ("DealMessage receive (%s)\n", ping_res_.mess().c_str()); + res_ = &ping_res_; + set_is_reply(true); + return 0; +} + +class MyConnFactory : public ConnFactory { + public: + virtual PinkConn *NewPinkConn(int connfd, const std::string &ip_port, Thread *thread) const { + return new MyConn(connfd, ip_port, thread); + } +}; + +class MyWorker : public WorkerThread { + explicit MyWorker(ConnFactory *conn_factory, int cron_interval = 0) + : WorkerThread(conn_factory, cron_interval) { + } +}; + +static std::atomic running(false); + +static void IntSigHandle(const int sig) { + printf("Catch Signal %d, cleanup...\n", sig); + running.store(false); + printf("server Exit"); +} + +static void SignalSetup() { + signal(SIGHUP, SIG_IGN); + signal(SIGPIPE, SIG_IGN); + signal(SIGINT, &IntSigHandle); + signal(SIGQUIT, &IntSigHandle); + signal(SIGTERM, &IntSigHandle); +} + +int main() { + SignalSetup(); + Thread* my_worker[10]; + ConnFactory *my_conn_factory = new MyConnFactory(); + for (int i = 0; i < 10; i++) { + my_worker[i] = NewWorkerThread(my_conn_factory, 1000); + } + ServerThread *st = NewDispatchThread(9211, 10, my_worker, 1000); + + if (st->StartThread() != 0) { + printf("StartThread error happened!\n"); + exit(-1); + } + running.store(true); + while (running.load()) { + sleep(1); + } + st->StopThread(); + + delete st; + delete my_conn_factory; + for (int i = 0; i < 10; i++) { + delete my_worker[i]; + } + + return 0; +} diff --git a/pink/example/simple_http_server.cc b/pink/example/simple_http_server.cc index e2f72804..b6163428 100644 --- a/pink/example/simple_http_server.cc +++ b/pink/example/simple_http_server.cc @@ -9,8 +9,7 @@ #include "slash/include/slash_status.h" #include "pink/include/pink_thread.h" -#include "pink/src/worker_thread.h" -#include "pink/src/dispatch_thread.h" +#include "pink/include/server_thread.h" #include "pink/include/simple_http_conn.h" using namespace pink; diff --git a/pink/include/http_conn.h b/pink/include/http_conn.h index 4dfba859..50f42374 100644 --- a/pink/include/http_conn.h +++ b/pink/include/http_conn.h @@ -16,7 +16,6 @@ #include "pink/include/pink_conn.h" #include "pink/include/pink_define.h" #include "pink/src/csapp.h" -#include "pink/src/worker_thread.h" #include "pink/src/pink_util.h" namespace pink { diff --git a/pink/include/server_thread.h b/pink/include/server_thread.h index 36f9b544..c192ec4b 100644 --- a/pink/include/server_thread.h +++ b/pink/include/server_thread.h @@ -153,6 +153,26 @@ extern ServerThread *NewDispatchThread( const ServerHandle* handle = nullptr, const ThreadEnvHandle* thandle = nullptr); +extern ServerThread* NewDispatchThread( + int port, + int work_num, Thread **worker_thread, + int cron_interval = 0, + const ServerHandle* handle = nullptr); +extern ServerThread* NewDispatchThread( + const std::string &ip, int port, + int work_num, Thread **worker_thread, + int cron_interval = 0, + const ServerHandle* handle = nullptr); +extern ServerThread* NewDispatchThread( + const std::set& ips, int port, + int work_num, Thread **worker_thread, + int cron_interval = 0, + const ServerHandle* handle = nullptr); + +extern Thread *NewWorkerThread( + ConnFactory *conn_factory, + int cron_interval = 0); + } // namespace pink #endif diff --git a/pink/include/simple_http_conn.h b/pink/include/simple_http_conn.h index fdd019cb..f4093e17 100644 --- a/pink/include/simple_http_conn.h +++ b/pink/include/simple_http_conn.h @@ -15,7 +15,6 @@ #include "pink/include/pink_conn.h" #include "pink/include/pink_define.h" #include "pink/src/csapp.h" -#include "pink/src/worker_thread.h" #include "pink/src/pink_util.h" namespace pink { diff --git a/pink/src/worker_thread.h b/pink/include/worker_thread.h similarity index 99% rename from pink/src/worker_thread.h rename to pink/include/worker_thread.h index 4fb87237..d1a0cec2 100644 --- a/pink/src/worker_thread.h +++ b/pink/include/worker_thread.h @@ -35,6 +35,7 @@ class WorkerThread : public Thread { public: explicit WorkerThread(ConnFactory *conn_factory, int cron_interval = 0, const ThreadEnvHandle* thandle = nullptr); + virtual ~WorkerThread(); void set_keepalive_timeout(int timeout) { diff --git a/pink/src/dispatch_thread.cc b/pink/src/dispatch_thread.cc index 00952709..6c2a0b58 100644 --- a/pink/src/dispatch_thread.cc +++ b/pink/src/dispatch_thread.cc @@ -2,7 +2,7 @@ #include "pink/src/pink_item.h" #include "pink/src/pink_epoll.h" -#include "pink/src/worker_thread.h" +#include "pink/include/worker_thread.h" namespace pink { @@ -12,7 +12,8 @@ DispatchThread::DispatchThread(int port, const ThreadEnvHandle* ehandle) : ServerThread::ServerThread(port, cron_interval, handle), last_thread_(0), - work_num_(work_num) { + work_num_(work_num), + owned_worker_thread_(true) { worker_thread_ = new WorkerThread*[work_num_]; for (int i = 0; i < work_num_; i++) { worker_thread_[i] = new WorkerThread(conn_factory, cron_interval, ehandle); @@ -25,7 +26,8 @@ DispatchThread::DispatchThread(const std::string &ip, int port, const ThreadEnvHandle* ehandle) : ServerThread::ServerThread(ip, port, cron_interval, handle), last_thread_(0), - work_num_(work_num) { + work_num_(work_num), + owned_worker_thread_(true) { worker_thread_ = new WorkerThread*[work_num_]; for (int i = 0; i < work_num_; i++) { worker_thread_[i] = new WorkerThread(conn_factory, cron_interval, ehandle); @@ -38,18 +40,51 @@ DispatchThread::DispatchThread(const std::set& ips, int port, const ThreadEnvHandle* ehandle) : ServerThread::ServerThread(ips, port, cron_interval, handle), last_thread_(0), - work_num_(work_num) { + work_num_(work_num), + owned_worker_thread_(true) { worker_thread_ = new WorkerThread*[work_num_]; for (int i = 0; i < work_num_; i++) { worker_thread_[i] = new WorkerThread(conn_factory, cron_interval, ehandle); } } +DispatchThread::DispatchThread(int port, + int work_num, Thread **worker_thread, + int cron_interval, const ServerHandle* handle) + : ServerThread::ServerThread(port, cron_interval, handle), + last_thread_(0), + work_num_(work_num), + worker_thread_(reinterpret_cast(worker_thread)), + owned_worker_thread_(false) { +} + +DispatchThread::DispatchThread(const std::string &ip, int port, + int work_num, Thread **worker_thread, + int cron_interval, const ServerHandle* handle) + : ServerThread::ServerThread(ip, port, cron_interval, handle), + last_thread_(0), + work_num_(work_num), + worker_thread_(reinterpret_cast(worker_thread)), + owned_worker_thread_(false) { +} + +DispatchThread::DispatchThread(const std::set& ips, int port, + int work_num, Thread **worker_thread, + int cron_interval, const ServerHandle* handle) + : ServerThread::ServerThread(ips, port, cron_interval, handle), + last_thread_(0), + work_num_(work_num), + worker_thread_(reinterpret_cast(worker_thread)), + owned_worker_thread_(false) { +} + DispatchThread::~DispatchThread() { - for (int i = 0; i < work_num_; i++) { - delete worker_thread_[i]; + if (owned_worker_thread_) { + for (int i = 0; i < work_num_; i++) { + delete worker_thread_[i]; + } + delete[] worker_thread_; } - delete worker_thread_; } int DispatchThread::StartThread() { @@ -126,4 +161,29 @@ extern ServerThread *NewDispatchThread( cron_interval, handle, ehandle); } +extern ServerThread* NewDispatchThread( + int port, + int work_num, Thread **worker_thread, + int cron_interval, + const ServerHandle* handle) { + return new DispatchThread(port, work_num, worker_thread, + cron_interval, handle); +} +extern ServerThread* NewDispatchThread( + const std::string &ip, int port, + int work_num, Thread **worker_thread, + int cron_interval, + const ServerHandle* handle) { + return new DispatchThread(ip, port, work_num, worker_thread, + cron_interval, handle); +} +extern ServerThread* NewDispatchThread( + const std::set& ips, int port, + int work_num, Thread **worker_thread, + int cron_interval, + const ServerHandle* handle) { + return new DispatchThread(ips, port, work_num, worker_thread, + cron_interval, handle); +} + }; // namespace pink diff --git a/pink/src/dispatch_thread.h b/pink/src/dispatch_thread.h index e6c9edac..f43d6c41 100644 --- a/pink/src/dispatch_thread.h +++ b/pink/src/dispatch_thread.h @@ -36,6 +36,16 @@ class DispatchThread : public ServerThread { int cron_interval = 0, const ServerHandle* handle = nullptr, const ThreadEnvHandle* ehandle = nullptr); + DispatchThread(int port, + int work_num, Thread **worker_thread, + int cron_interval = 0, const ServerHandle* handle = nullptr); + DispatchThread(const std::string &ip, int port, + int work_num, Thread **worker_thread, + int cron_interval = 0, const ServerHandle* handle = nullptr); + DispatchThread(const std::set& ips, int port, + int work_num, Thread **worker_thread, + int cron_interval = 0, const ServerHandle* handle = nullptr); + virtual ~DispatchThread(); virtual int StartThread() override; @@ -57,6 +67,7 @@ class DispatchThread : public ServerThread { * This is the work threads */ WorkerThread** worker_thread_; + bool owned_worker_thread_; void HandleNewConn(const int connfd, const std::string& ip_port) override; void HandleConnEvent(PinkFiredEvent *pfe) override {} diff --git a/pink/src/http_conn.cc b/pink/src/http_conn.cc index eed6a386..a2b255de 100644 --- a/pink/src/http_conn.cc +++ b/pink/src/http_conn.cc @@ -13,7 +13,6 @@ #include "slash/include/xdebug.h" #include "slash/include/slash_string.h" #include "pink/include/pink_define.h" -#include "pink/src/worker_thread.h" namespace pink { diff --git a/pink/src/pb_conn.cc b/pink/src/pb_conn.cc index ab387b24..fb7cda9f 100644 --- a/pink/src/pb_conn.cc +++ b/pink/src/pb_conn.cc @@ -9,7 +9,6 @@ #include "slash/include/xdebug.h" #include "pink/include/pink_define.h" -#include "pink/src/worker_thread.h" #include "pink/src/csapp.h" namespace pink { diff --git a/pink/src/redis_conn.cc b/pink/src/redis_conn.cc index e28764dd..c234ba41 100644 --- a/pink/src/redis_conn.cc +++ b/pink/src/redis_conn.cc @@ -6,7 +6,6 @@ #include "slash/include/xdebug.h" #include "pink/include/pink_define.h" #include "pink/include/redis_conn.h" -#include "pink/src/worker_thread.h" namespace pink { diff --git a/pink/src/simple_http_conn.cc b/pink/src/simple_http_conn.cc index 609655e8..046a2ca7 100644 --- a/pink/src/simple_http_conn.cc +++ b/pink/src/simple_http_conn.cc @@ -13,7 +13,6 @@ #include "slash/include/xdebug.h" #include "slash/include/slash_string.h" #include "pink/include/pink_define.h" -#include "pink/src/worker_thread.h" namespace pink { diff --git a/pink/src/worker_thread.cc b/pink/src/worker_thread.cc index 73faa4ed..b3acb582 100644 --- a/pink/src/worker_thread.cc +++ b/pink/src/worker_thread.cc @@ -1,4 +1,4 @@ -#include "worker_thread.h" +#include "pink/include/worker_thread.h" #include "pink/include/pink_conn.h" #include "pink/src/pink_item.h" @@ -173,4 +173,8 @@ void WorkerThread::Cleanup() { conns_.clear(); } +Thread *NewWorkerThread(ConnFactory *conn_factory, int cron_interval) { + return new WorkerThread(conn_factory, cron_interval); +} + }; // namespace pink