diff --git a/core-tests/src/os/process_pool.cpp b/core-tests/src/os/process_pool.cpp index 24e853aea4..01b0ddaa8f 100644 --- a/core-tests/src/os/process_pool.cpp +++ b/core-tests/src/os/process_pool.cpp @@ -11,20 +11,57 @@ using namespace swoole; static void test_func(ProcessPool &pool) { EventData data{}; - data.info.len = strlen(TEST_JPG_MD5SUM); - memcpy(data.data, TEST_JPG_MD5SUM, data.info.len); + size_t size = swoole_system_random(1024, 4096); + String rmem(size); + rmem.append_random_bytes(size - 1); + rmem.append("\0"); + + data.info.len = size; + memcpy(data.data, rmem.value(), size); int worker_id = -1; ASSERT_EQ(pool.dispatch_blocking(&data, &worker_id), SW_OK); pool.running = true; + pool.ptr = &rmem; + SwooleWG.run_always = true; + pool.main_loop(&pool, pool.get_worker(0)); + pool.destroy(); +} + +static void test_func_task_protocol(ProcessPool &pool) { + pool.set_protocol(SW_PROTOCOL_TASK); pool.onTask = [](ProcessPool *pool, Worker *worker, EventData *task) -> int { pool->running = false; - EXPECT_MEMEQ(task->data, TEST_JPG_MD5SUM, task->info.len); + String *_data = (String *) pool->ptr; + usleep(10000); + EXPECT_MEMEQ(_data->str, task->data, task->len()); return 0; }; - pool.main_loop(&pool, pool.get_worker(0)); - pool.destroy(); + test_func(pool); +} + +static void test_func_message_protocol(ProcessPool &pool) { + pool.set_protocol(SW_PROTOCOL_MESSAGE); + pool.onMessage = [](ProcessPool *pool, RecvData *rdata) { + pool->running = false; + String *_data = (String *) pool->ptr; + usleep(10000); + EXPECT_MEMEQ(_data->str, rdata->data, rdata->info.len); + }; + test_func(pool); +} + +static void test_func_stream_protocol(ProcessPool &pool) { + pool.set_protocol(SW_PROTOCOL_STREAM); + pool.onMessage = [](ProcessPool *pool, RecvData *rdata) { + pool->running = false; + String *_data = (String *) pool->ptr; + EventData *msg = (EventData *) rdata->data; + usleep(10000); + EXPECT_MEMEQ(_data->str, msg->data, msg->len()); + }; + test_func(pool); } TEST(process_pool, tcp) { @@ -32,7 +69,7 @@ TEST(process_pool, tcp) { ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK); ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK); - test_func(pool); + test_func_task_protocol(pool); } TEST(process_pool, unix_sock) { @@ -40,7 +77,7 @@ TEST(process_pool, unix_sock) { signal(SIGPIPE, SIG_IGN); ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK); - test_func(pool); + test_func_task_protocol(pool); } TEST(process_pool, tcp_raw) { @@ -72,7 +109,21 @@ TEST(process_pool, msgqueue) { ProcessPool pool{}; ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK); - test_func(pool); + test_func_task_protocol(pool); +} + +TEST(process_pool, message_protocol) { + ProcessPool pool{}; + ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK); + + test_func_message_protocol(pool); +} + +TEST(process_pool, stream_protocol) { + ProcessPool pool{}; + ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK); + + test_func_stream_protocol(pool); } constexpr int magic_number = 99900011; diff --git a/core-tests/src/server/server.cpp b/core-tests/src/server/server.cpp index 3d7e06de16..c19a7c7e67 100644 --- a/core-tests/src/server/server.cpp +++ b/core-tests/src/server/server.cpp @@ -545,10 +545,12 @@ TEST(server, task_worker) { exit(2); } - serv.onTask = [](Server *serv, swEventData *task) -> int { - EXPECT_EQ(serv->get_task_count(), 1); + serv.onTask = [](Server *serv, EventData *task) -> int { + EXPECT_EQ(serv->get_tasking_num(), 1); EXPECT_EQ(string(task->data, task->info.len), string(packet)); serv->gs->task_workers.running = 0; + serv->gs->task_count++; + serv->gs->tasking_num--; return 0; }; @@ -556,13 +558,12 @@ TEST(server, task_worker) { ASSERT_EQ(serv.create_task_workers(), SW_OK); thread t1([&serv]() { + SwooleWG.run_always = true; serv.gs->task_workers.running = 1; - serv.gs->tasking_num++; serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]); + EXPECT_EQ(serv.get_tasking_num(), 0); serv.gs->tasking_num--; - EXPECT_EQ(serv.get_task_count(), 0); - serv.gs->tasking_num--; - EXPECT_EQ(serv.get_task_count(), 0); + EXPECT_EQ(serv.get_tasking_num(), 0); EXPECT_EQ(serv.get_idle_task_worker_num(), serv.task_worker_num); }); @@ -577,10 +578,13 @@ TEST(server, task_worker) { int _dst_worker_id = 0; - ASSERT_GE(serv.gs->task_workers.dispatch(&buf, &_dst_worker_id), 0); + ASSERT_TRUE(serv.task(&buf, &_dst_worker_id)); + ASSERT_EQ(serv.gs->task_count, 1); t1.join(); serv.gs->task_workers.destroy(); + + ASSERT_EQ(serv.gs->task_count, 2); } // PHP_METHOD(swoole_server, task) @@ -600,8 +604,7 @@ TEST(server, task_worker2) { serv.onTask = [](Server *serv, swEventData *task) -> int { EXPECT_EQ(string(task->data, task->info.len), string(packet)); - int ret = serv->reply_task_result(task->data, task->info.len, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task)); return 0; }; @@ -623,7 +626,7 @@ TEST(server, task_worker2) { memcpy(buf.data, packet, strlen(packet)); buf.info.reactor_id = worker->id; buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_CALLBACK); - ASSERT_GE(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), 0); + ASSERT_EQ(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), SW_OK); sleep(1); kill(serv->gs->master_pid, SIGTERM); } @@ -649,8 +652,7 @@ TEST(server, task_worker3) { serv.onTask = [](Server *serv, swEventData *task) -> int { EXPECT_EQ(string(task->data, task->info.len), string(packet)); - int ret = serv->reply_task_result(task->data, task->info.len, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task)); return 0; }; @@ -698,8 +700,7 @@ TEST(server, task_worker4) { serv.onTask = [](Server *serv, swEventData *task) -> int { EXPECT_EQ(string(task->data, task->info.len), string(packet)); - int ret = serv->reply_task_result(task->data, task->info.len, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task)); return 0; }; @@ -724,7 +725,7 @@ TEST(server, task_worker4) { serv->gs->task_workers.dispatch(&buf, &_dst_worker_id); sleep(1); - EventData *task_result = &(serv->task_result[swoole_get_process_id()]); + EventData *task_result = serv->get_task_result(); sw_memset_zero(task_result, sizeof(*task_result)); memset(&buf.info, 0, sizeof(buf.info)); buf.info.len = strlen(packet); @@ -767,8 +768,7 @@ TEST(server, task_worker5) { ifs.close(); EXPECT_EQ(string(resp), string(data)); - int ret = serv->reply_task_result(resp, SW_IPC_MAX_SIZE * 2, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(resp, SW_IPC_MAX_SIZE * 2, 0, task)); return 0; }; @@ -779,7 +779,7 @@ TEST(server, task_worker5) { if (worker->id == 1) { int _dst_worker_id = 0; - EventData *task_result = &(serv->task_result[worker->id]); + EventData *task_result = &(serv->task_results[worker->id]); sw_memset_zero(task_result, sizeof(*task_result)); File fp = make_tmpfile(); diff --git a/ext-src/php_swoole_server.h b/ext-src/php_swoole_server.h index 4fc2429069..f96d75305f 100644 --- a/ext-src/php_swoole_server.h +++ b/ext-src/php_swoole_server.h @@ -124,7 +124,7 @@ struct ServerObject { struct TaskCo { Coroutine *co; - int *list; + TaskId *list; uint32_t count; zval *result; }; diff --git a/ext-src/swoole_process_pool.cc b/ext-src/swoole_process_pool.cc index 42d2f67bfb..b7065bf842 100644 --- a/ext-src/swoole_process_pool.cc +++ b/ext-src/swoole_process_pool.cc @@ -430,9 +430,9 @@ static PHP_METHOD(swoole_process_pool, write) { char *data; size_t length; - if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &data, &length) == FAILURE) { - RETURN_FALSE; - } + ZEND_PARSE_PARAMETERS_START(1, 1) + Z_PARAM_STRING(data, length) + ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); ProcessPool *pool = process_pool_get_and_check_pool(ZEND_THIS); if (pool->ipc_mode != SW_IPC_SOCKET) { diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index be52068f29..2dfe4051d9 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -85,8 +85,8 @@ static void php_swoole_server_onWorkerError(Server *serv, Worker *worker, const static void php_swoole_server_onManagerStart(Server *serv); static void php_swoole_server_onManagerStop(Server *serv); -static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task); -static TaskId php_swoole_server_task_pack(EventData *task, zval *data); +static bool php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task); +static TaskId php_swoole_server_task_pack(zval *data, EventData *task); static bool php_swoole_server_task_unpack(zval *zresult, EventData *task_result); static int php_swoole_server_dispatch_func(Server *serv, Connection *conn, SendData *data); static zval *php_swoole_server_add_port(ServerObject *server_object, ListenPort *port); @@ -655,7 +655,7 @@ int php_swoole_create_dir(const char *path, size_t length) { return php_stream_mkdir(path, 0777, PHP_STREAM_MKDIR_RECURSIVE | REPORT_ERRORS, nullptr) ? 0 : -1; } -static TaskId php_swoole_server_task_pack(EventData *task, zval *zdata) { +static TaskId php_swoole_server_task_pack(zval *zdata, EventData *task) { smart_str serialized_data = {}; php_serialize_data_t var_hash; @@ -1025,13 +1025,12 @@ void ServerObject::register_callback() { } } -static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task) { +static bool php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task) { int flags = 0; smart_str serialized_data = {}; php_serialize_data_t var_hash; char *data_str; size_t data_len = 0; - int ret; // need serialize if (Z_TYPE_P(zdata) != IS_STRING) { @@ -1049,9 +1048,9 @@ static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *c data_len = Z_STRLEN_P(zdata); } - ret = serv->reply_task_result(data_str, data_len, flags, current_task); + bool success = serv->finish(data_str, data_len, flags, current_task); smart_str_free(&serialized_data); - return ret; + return success; } static void php_swoole_server_onPipeMessage(Server *serv, EventData *req) { @@ -2973,7 +2972,8 @@ static PHP_METHOD(swoole_server, stats) { if (serv->task_worker_num > 0) { add_assoc_long_ex(return_value, ZEND_STRL("task_idle_worker_num"), serv->get_idle_task_worker_num()); - add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), serv->get_task_count()); + add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), serv->get_tasking_num()); + add_assoc_long_ex(return_value, ZEND_STRL("task_count"), serv->gs->task_count); } add_assoc_long_ex(return_value, ZEND_STRL("coroutine_num"), Coroutine::count()); @@ -3056,9 +3056,6 @@ static PHP_METHOD(swoole_server, taskwait) { RETURN_FALSE; } - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - zval *zdata; double timeout = SW_TASKWAIT_TIMEOUT; zend_long dst_worker_id = -1; @@ -3074,11 +3071,11 @@ static PHP_METHOD(swoole_server, taskwait) { RETURN_FALSE; } - if (php_swoole_server_task_pack(&buf, zdata) < 0) { + EventData buf; + if (php_swoole_server_task_pack(zdata, &buf) < 0) { RETURN_FALSE; } - int _dst_worker_id = (int) dst_worker_id; TaskId task_id = serv->get_task_id(&buf); // coroutine @@ -3091,9 +3088,7 @@ static PHP_METHOD(swoole_server, taskwait) { task_co.count = 1; task_co.result = return_value; - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) < 0) { - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); + if (!serv->task(&buf, (int *) &dst_worker_id)) { RETURN_FALSE; } @@ -3104,49 +3099,19 @@ static PHP_METHOD(swoole_server, taskwait) { if (!retval) { RETURN_FALSE; } - return; - } - - uint64_t notify; - EventData *task_result = &(serv->task_result[swoole_get_process_id()]); - sw_memset_zero(task_result, sizeof(*task_result)); - Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get(); - network::Socket *task_notify_socket = pipe->get_socket(false); - - // clear history task - while (task_notify_socket->wait_event(0, SW_EVENT_READ) == SW_OK) { - if (task_notify_socket->read(¬ify, sizeof(notify)) <= 0) { - break; + } else { + auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout); + if (!retval) { + RETURN_FALSE; } - } - - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - - if (serv->gs->task_workers.dispatch_blocking(&buf, &_dst_worker_id) == SW_OK) { - while (1) { - if (task_notify_socket->wait_event((int) (timeout * 1000), SW_EVENT_READ) != SW_OK) { - break; - } - if (pipe->read(¬ify, sizeof(notify)) > 0) { - if (serv->get_task_id(task_result) != task_id) { - continue; - } - zval zresult; - if (!php_swoole_server_task_unpack(&zresult, task_result)) { - RETURN_FALSE; - } else { - RETURN_ZVAL(&zresult, 0, 0); - } - break; - } else { - php_swoole_sys_error(E_WARNING, "taskwait failed"); - break; - } + zval zresult; + auto task_result = serv->get_task_result(); + if (!php_swoole_server_task_unpack(&zresult, task_result)) { + RETURN_FALSE; + } else { + RETURN_ZVAL(&zresult, 0, 0); } - } else { - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); } - RETURN_FALSE; } static PHP_METHOD(swoole_server, taskWaitMulti) { @@ -3164,9 +3129,6 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { RETURN_FALSE; } - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - zval *ztasks; double timeout = SW_TASKWAIT_TIMEOUT; @@ -3187,10 +3149,10 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { RETURN_FALSE; } - int list_of_id[SW_MAX_CONCURRENT_TASK] = {}; + TaskId list_of_id[SW_MAX_CONCURRENT_TASK] = {}; uint64_t notify; - EventData *task_result = &(serv->task_result[swoole_get_process_id()]); + EventData *task_result = serv->get_task_result(); sw_memset_zero(task_result, sizeof(*task_result)); Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get(); Worker *worker = serv->get_worker(swoole_get_process_id()); @@ -3219,7 +3181,9 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { zval *ztask; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) - TaskId task_id = php_swoole_server_task_pack(&buf, ztask); + + EventData buf; + TaskId task_id = php_swoole_server_task_pack(ztask, &buf); if (task_id < 0) { php_swoole_fatal_error(E_WARNING, "task pack failed"); goto _fail; @@ -3227,8 +3191,8 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { buf.info.ext_flags |= SW_TASK_WAITALL; dst_worker_id = -1; sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - if (serv->gs->task_workers.dispatch_blocking(&buf, &dst_worker_id) < 0) { - php_swoole_sys_error(E_WARNING, "taskwait failed"); + if (!serv->task(&buf, &dst_worker_id, true)) { + php_swoole_sys_error(E_WARNING, "failed to dispatch task"); task_id = -1; _fail: add_index_bool(return_value, i, 0); @@ -3280,7 +3244,7 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { } (void) add_index_zval(return_value, j, &zresult); _next: - content->offset += sizeof(DataHead) + result->info.len; + content->offset += result->size(); } while (content->offset < 0 || (size_t) content->offset < content->length); // delete tmp file unlink(file_path.c_str()); @@ -3313,9 +3277,6 @@ static PHP_METHOD(swoole_server, taskCo) { int i = 0; uint32_t n_task = php_swoole_array_length(ztasks); - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - if (n_task >= SW_MAX_CONCURRENT_TASK) { php_swoole_fatal_error(E_WARNING, "too many concurrent tasks"); RETURN_FALSE; @@ -3325,7 +3286,7 @@ static PHP_METHOD(swoole_server, taskCo) { RETURN_FALSE; } - int *list = (int *) ecalloc(n_task, sizeof(int)); + TaskId *list = (TaskId *) ecalloc(n_task, sizeof(TaskId)); if (list == nullptr) { RETURN_FALSE; } @@ -3337,20 +3298,19 @@ static PHP_METHOD(swoole_server, taskCo) { zval *ztask; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) { - task_id = php_swoole_server_task_pack(&buf, ztask); + EventData buf; + task_id = php_swoole_server_task_pack(ztask, &buf); if (task_id < 0) { php_swoole_fatal_error(E_WARNING, "failed to pack task"); goto _fail; } buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_COROUTINE); dst_worker_id = -1; - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - if (serv->gs->task_workers.dispatch(&buf, &dst_worker_id) < 0) { + if (!serv->task(&buf, &dst_worker_id)) { task_id = -1; _fail: add_index_bool(return_value, i, 0); n_task--; - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); } else { server_object->property->task_coroutine_map[task_id] = &task_co; } @@ -3406,14 +3366,11 @@ static PHP_METHOD(swoole_server, task) { } EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - - if (php_swoole_server_task_pack(&buf, zdata) < 0) { + TaskId task_id = php_swoole_server_task_pack(zdata, &buf); + if (task_id < 0) { RETURN_FALSE; } - TaskId task_id = serv->get_task_id(&buf); - if (!serv->is_worker()) { buf.info.ext_flags |= SW_TASK_NOREPLY; } else if (fci.size) { @@ -3424,15 +3381,11 @@ static PHP_METHOD(swoole_server, task) { buf.info.ext_flags |= SW_TASK_NONBLOCK; - int _dst_worker_id = (int) dst_worker_id; - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - - if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) >= 0) { + if (serv->task(&buf, (int *) &dst_worker_id)) { RETURN_LONG(task_id); + } else { + RETURN_FALSE; } - - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); - RETURN_FALSE; } static PHP_METHOD(swoole_server, command) { @@ -3523,17 +3476,11 @@ static PHP_METHOD(swoole_server, sendMessage) { } EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - - if (php_swoole_server_task_pack(&buf, zmessage) < 0) { + if (php_swoole_server_task_pack(zmessage, &buf) < 0) { RETURN_FALSE; } - buf.info.type = SW_SERVER_EVENT_PIPE_MESSAGE; - - Worker *to_worker = serv->get_worker(worker_id); - SW_CHECK_RETURN(serv->send_to_worker_from_worker( - to_worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER | SW_PIPE_NONBLOCK)); + RETURN_BOOL(serv->send_pipe_message(worker_id, &buf)); } static PHP_METHOD(swoole_server, finish) { @@ -3555,7 +3502,7 @@ static PHP_METHOD(swoole_server, finish) { Z_PARAM_ZVAL(zdata) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - SW_CHECK_RETURN(php_swoole_server_task_finish(serv, zdata, nullptr)); + RETURN_BOOL(php_swoole_server_task_finish(serv, zdata, nullptr)); } static PHP_METHOD(swoole_server_task, finish) { @@ -3572,25 +3519,23 @@ static PHP_METHOD(swoole_server_task, finish) { ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); DataHead *info = php_swoole_server_task_get_info(ZEND_THIS); - SW_CHECK_RETURN(php_swoole_server_task_finish(serv, zdata, (EventData *) info)); + RETURN_BOOL(php_swoole_server_task_finish(serv, zdata, (EventData *) info)); } static PHP_METHOD(swoole_server_task, pack) { - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - zval *zdata; ZEND_PARSE_PARAMETERS_START(1, 1) Z_PARAM_ZVAL(zdata) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - if (php_swoole_server_task_pack(&buf, zdata) < 0) { + EventData buf; + if (php_swoole_server_task_pack(zdata, &buf) < 0) { RETURN_FALSE; } buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_NOREPLY); - RETURN_STRINGL((char *) &buf, sizeof(buf.info) + buf.info.len); + RETURN_STRINGL((char *) &buf, buf.size()); } static PHP_METHOD(swoole_server_task, unpack) { diff --git a/include/swoole.h b/include/swoole.h index 3ae29b2210..01a3ec9c1c 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -669,6 +669,14 @@ struct DataHead { struct EventData { DataHead info; char data[SW_IPC_BUFFER_SIZE]; + + uint32_t size() { + return sizeof(info) + len(); + } + + uint32_t len() { + return info.len; + } }; struct SendData { diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index 039b63b9a6..ae72812a40 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -141,7 +141,6 @@ struct Worker { uint8_t msgqueue_mode; uint8_t child_process; - sw_atomic_t tasking_num; uint32_t concurrency; time_t start_time; @@ -328,10 +327,10 @@ struct ProcessPool { bool reload(); pid_t spawn(Worker *worker); void stop(Worker *worker); - int dispatch(EventData *data, int *worker_id); + swResultCode dispatch(EventData *data, int *worker_id); int response(const char *data, int length); - int dispatch_blocking(EventData *data, int *dst_worker_id); - int dispatch_blocking(const char *data, uint32_t len); + swResultCode dispatch_blocking(EventData *data, int *dst_worker_id); + swResultCode dispatch_blocking(const char *data, uint32_t len); void add_worker(Worker *worker); int del_worker(Worker *worker); void destroy(); @@ -345,7 +344,14 @@ struct ProcessPool { int listen(const char *host, int port, int blacklog); int schedule(); bool is_worker_running(Worker *worker); + static void kill_timeout_worker(Timer *timer, TimerNode *tnode); + + private: + static int run_with_task_protocol(ProcessPool *pool, Worker *worker); + static int run_with_stream_protocol(ProcessPool *pool, Worker *worker); + static int run_with_message_protocol(ProcessPool *pool, Worker *worker); + static int run_async(ProcessPool *pool, Worker *worker); }; }; // namespace swoole diff --git a/include/swoole_server.h b/include/swoole_server.h index f8904dba8e..eeffcbb689 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -389,6 +389,7 @@ struct ServerGS { sw_atomic_long_t total_recv_bytes; sw_atomic_long_t total_send_bytes; sw_atomic_long_t pipe_packet_msg_id; + sw_atomic_long_t task_count; sw_atomic_t spinlock; @@ -837,7 +838,7 @@ class Server { uint32_t task_max_request = 0; uint32_t task_max_request_grace = 0; std::vector> task_notify_pipes; - EventData *task_result = nullptr; + EventData *task_results = nullptr; /** * Used for process management, saving the mapping relationship between PID and worker pointers @@ -995,12 +996,20 @@ class Server { int get_idle_worker_num(); int get_idle_task_worker_num(); - int get_task_count(); + int get_tasking_num(); TaskId get_task_id(EventData *task) { return gs->task_workers.get_task_id(task); } + uint16_t get_command_id(EventData *cmd) { + return cmd->info.server_fd; + } + + EventData *get_task_result() { + return &(task_results[swoole_get_process_id()]); + } + WorkerId get_task_src_worker_id(EventData *task) { return gs->task_workers.get_task_src_worker_id(task); } @@ -1362,11 +1371,10 @@ class Server { ssize_t send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags); ssize_t send_to_worker_from_worker(WorkerId id, EventData *data, int flags) { - return send_to_worker_from_worker(get_worker(id), data, sizeof(data->info) + data->info.len, flags); + return send_to_worker_from_worker(get_worker(id), data, data->size(), flags); } ssize_t send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id); - int reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task); bool send(SessionId session_id, const void *data, uint32_t length); bool sendfile(SessionId session_id, const char *file, uint32_t l_file, off_t offset, size_t length); @@ -1381,6 +1389,11 @@ class Server { const std::string &msg, const Command::Callback &fn); + bool task(EventData *task, int *dst_worker_id, bool blocking = false); + bool finish(const char *data, size_t data_len, int flags, EventData *current_task); + bool task_sync(EventData *task, int *dst_worker_id, double timeout); + bool send_pipe_message(WorkerId worker_id, EventData *msg); + void init_reactor(Reactor *reactor); void init_worker(Worker *worker); void init_task_workers(); diff --git a/src/os/process_pool.cc b/src/os/process_pool.cc index 7da0b53f87..5e94c05c73 100644 --- a/src/os/process_pool.cc +++ b/src/os/process_pool.cc @@ -32,11 +32,6 @@ namespace swoole { using network::Socket; using network::Stream; -static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker *worker); -static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worker *worker); -static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Worker *worker); -static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker); - void ProcessPool::kill_timeout_worker(Timer *timer, TimerNode *tnode) { uint32_t i; pid_t reload_worker_pid = 0; @@ -113,7 +108,7 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPCMode _ip map_ = new std::unordered_map; ipc_mode = _ipc_mode; - main_loop = ProcessPool_worker_loop_with_task_protocol; + main_loop = run_with_task_protocol; protocol_type_ = SW_PROTOCOL_TASK; max_packet_size_ = SW_INPUT_BUFFER_SIZE; @@ -207,13 +202,13 @@ int ProcessPool::listen(const char *host, int port, int blacklog) { void ProcessPool::set_protocol(enum ProtocolType _protocol_type) { switch (_protocol_type) { case SW_PROTOCOL_TASK: - main_loop = ProcessPool_worker_loop_with_task_protocol; + main_loop = run_with_task_protocol; break; case SW_PROTOCOL_STREAM: - main_loop = ProcessPool_worker_loop_with_stream_protocol; + main_loop = run_with_stream_protocol; break; case SW_PROTOCOL_MESSAGE: - main_loop = ProcessPool_worker_loop_with_message_protocol; + main_loop = run_with_message_protocol; break; default: abort(); @@ -234,7 +229,7 @@ int ProcessPool::start_check() { swoole_set_process_type(SW_PROCESS_MASTER); if (async) { - main_loop = ProcessPool_worker_loop_async; + main_loop = run_async; } SW_LOOP_N(worker_num) { @@ -299,7 +294,7 @@ int ProcessPool::response(const char *data, int length) { } int ProcessPool::push_message(EventData *msg) { - if (message_box->push(msg, sizeof(msg->info) + msg->info.len) < 0) { + if (message_box->push(msg, msg->size()) < 0) { return SW_ERR; } return swoole_kill(master_pid, SIGIO); @@ -328,11 +323,7 @@ int ProcessPool::pop_message(void *data, size_t size) { return message_box->pop(data, size); } -/** - * dispatch data to worker - */ -int ProcessPool::dispatch(EventData *data, int *dst_worker_id) { - int ret = 0; +swResultCode ProcessPool::dispatch(EventData *data, int *dst_worker_id) { Worker *worker; if (use_socket) { @@ -341,7 +332,7 @@ int ProcessPool::dispatch(EventData *data, int *dst_worker_id) { return SW_ERR; } stream->response = nullptr; - if (stream->send((char *) data, sizeof(data->info) + data->info.len) < 0) { + if (stream->send((char *) data, data->size()) < 0) { stream->cancel = 1; delete stream; return SW_ERR; @@ -356,19 +347,15 @@ int ProcessPool::dispatch(EventData *data, int *dst_worker_id) { *dst_worker_id += start_id; worker = get_worker(*dst_worker_id); - int sendn = sizeof(data->info) + data->info.len; - ret = worker->send_pipe_message(data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK); - - if (ret >= 0) { - sw_atomic_fetch_add(&worker->tasking_num, 1); - } else { - swoole_warning("send %d bytes to worker#%d failed", sendn, *dst_worker_id); + if (worker->send_pipe_message(data, data->size(), SW_PIPE_MASTER | SW_PIPE_NONBLOCK) < 0) { + swoole_warning("send %d bytes to worker#%d failed", data->size(), *dst_worker_id); + return SW_ERR; } - return ret; + return SW_OK; } -int ProcessPool::dispatch_blocking(const char *data, uint32_t len) { +swResultCode ProcessPool::dispatch_blocking(const char *data, uint32_t len) { assert(use_socket); network::Client _socket(stream_info_->socket->socket_type, false); @@ -389,16 +376,9 @@ int ProcessPool::dispatch_blocking(const char *data, uint32_t len) { return SW_OK; } -/** - * dispatch data to worker - * @return SW_OK/SW_ERR - */ -int ProcessPool::dispatch_blocking(EventData *data, int *dst_worker_id) { - int ret = 0; - int sendn = sizeof(data->info) + data->info.len; - +swResultCode ProcessPool::dispatch_blocking(EventData *data, int *dst_worker_id) { if (use_socket) { - return dispatch_blocking((char *) data, sendn); + return dispatch_blocking((char *) data, data->size()); } if (*dst_worker_id < 0) { @@ -408,14 +388,11 @@ int ProcessPool::dispatch_blocking(EventData *data, int *dst_worker_id) { *dst_worker_id += start_id; Worker *worker = get_worker(*dst_worker_id); - ret = worker->send_pipe_message(data, sendn, SW_PIPE_MASTER); - if (ret < 0) { - swoole_warning("send %d bytes to worker#%d failed", sendn, *dst_worker_id); - } else { - sw_atomic_fetch_add(&worker->tasking_num, 1); + if (worker->send_pipe_message(data, data->size(), SW_PIPE_MASTER) < 0) { + swoole_warning("send %d bytes to worker#%d failed", data->size(), *dst_worker_id); + return SW_ERR; } - - return ret > 0 ? SW_OK : SW_ERR; + return SW_OK; } bool ProcessPool::reload() { @@ -525,7 +502,7 @@ bool ProcessPool::is_worker_running(Worker *worker) { return running && !SwooleWG.shutdown && !worker->has_exceeded_max_request(); } -static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker *worker) { +int ProcessPool::run_with_task_protocol(ProcessPool *pool, Worker *worker) { struct { long mtype; EventData buf; @@ -589,7 +566,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker continue; } - if (n != (ssize_t) (out.buf.info.len + sizeof(out.buf.info))) { + if (n != (ssize_t) out.buf.size()) { swoole_warning("bad task packet, The received data-length[%ld] is inconsistent with the packet-length[%ld]", n, out.buf.info.len + sizeof(out.buf.info)); @@ -644,7 +621,7 @@ static int ProcessPool_recv_message(Reactor *reactor, Event *event) { return SW_OK; } -static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker) { +int ProcessPool::run_async(ProcessPool *pool, Worker *worker) { if (pool->ipc_mode == SW_IPC_UNIXSOCK && pool->onMessage) { swoole_event_add(worker->pipe_worker, SW_EVENT_READ); if (pool->message_bus) { @@ -660,7 +637,7 @@ static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker) { return swoole_event_wait(); } -static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worker *worker) { +int ProcessPool::run_with_stream_protocol(ProcessPool *pool, Worker *worker) { ssize_t n; RecvData msg{}; msg.info.reactor_id = -1; @@ -757,7 +734,7 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke return SW_OK; } -static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Worker *worker) { +int ProcessPool::run_with_message_protocol(ProcessPool *pool, Worker *worker) { auto fn = [&]() -> int { if (worker->pipe_worker->wait_event(-1, SW_EVENT_READ) < 0) { return errno == EINTR ? 0 : -1; @@ -776,6 +753,16 @@ static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Work return 1; }; + if (pool->ipc_mode != SW_IPC_UNIXSOCK) { + swoole_error_log( + SW_LOG_WARNING, SW_ERROR_OPERATION_NOT_SUPPORT, "not support, ipc_mode must be SW_IPC_UNIXSOCK"); + return SW_ERR; + } + + if (pool->message_bus == nullptr) { + pool->create_message_bus(); + } + worker->pipe_worker->dont_restart = 1; while (pool->is_worker_running(worker)) { diff --git a/src/server/master.cc b/src/server/master.cc index 889ce35776..56564ba6b4 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -89,6 +89,7 @@ void Server::call_command_callback(int64_t request_id, const std::string &result return; } iter->second(this, result); + command_callbacks.erase(request_id); } void Server::call_command_handler(MessageBus &mb, uint16_t worker_id, Socket *sock) { @@ -527,6 +528,24 @@ int Server::create_task_workers() { } } + /* + * For Server::task_sync(), create notify pipe and result shared memory. + */ + task_results = (EventData *) sw_shm_calloc(worker_num, sizeof(EventData)); + if (!task_results) { + swoole_warning("malloc[task_result] failed"); + return SW_ERR; + } + SW_LOOP_N(worker_num) { + auto _pipe = new Pipe(true); + if (!_pipe->ready()) { + sw_shm_free(task_results); + delete _pipe; + return SW_ERR; + } + task_notify_pipes.emplace_back(_pipe); + } + init_task_workers(); return SW_OK; @@ -642,26 +661,6 @@ int Server::start() { gs->event_workers.workers[i].type = SW_PROCESS_WORKER; } - /* - * For swoole_server->taskwait, create notify pipe and result shared memory. - */ - if (task_worker_num > 0 && worker_num > 0) { - task_result = (EventData *) sw_shm_calloc(worker_num, sizeof(EventData)); - if (!task_result) { - swoole_warning("malloc[task_result] failed"); - return SW_ERR; - } - SW_LOOP_N(worker_num) { - auto _pipe = new Pipe(true); - if (!_pipe->ready()) { - sw_shm_free(task_result); - delete _pipe; - return SW_ERR; - } - task_notify_pipes.emplace_back(_pipe); - } - } - if (!user_worker_list.empty()) { uint32_t i = 0; for (auto worker : user_worker_list) { @@ -926,7 +925,9 @@ void Server::stop_master_thread() { if (port->is_dgram() and is_process_mode()) { continue; } - reactor->del(port->socket); + if (!port->socket->removed) { + reactor->del(port->socket); + } } if (pipe_command) { reactor->del(pipe_command->get_socket(true)); @@ -1128,56 +1129,58 @@ bool Server::command(WorkerId process_id, } int command_id = iter->second.id; - int64_t requset_id = command_current_request_id++; + int64_t request_id = command_current_request_id++; Socket *pipe_sock; SendData task{}; - task.info.fd = requset_id; + task.info.fd = request_id; task.info.reactor_id = process_id; task.info.server_fd = command_id; task.info.type = SW_SERVER_EVENT_COMMAND_REQUEST; task.info.len = msg.length(); task.data = msg.c_str(); + command_callbacks[request_id] = fn; + if (!(process_type & iter->second.accepted_process_types)) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_OPERATION_NOT_SUPPORT, "unsupported [process_type]"); + _fail: + command_callbacks.erase(request_id); return false; } if (process_type == Command::REACTOR_THREAD) { if (!is_process_mode()) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_OPERATION_NOT_SUPPORT, "unsupported [server_mode]"); - return false; + goto _fail; } if (process_id >= reactor_num) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "invalid thread_id[%d]", process_id); - return false; + goto _fail; } pipe_sock = get_worker(process_id)->pipe_worker; } else if (process_type == Command::EVENT_WORKER) { if (process_id >= worker_num) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "invalid worker_id[%d]", process_id); - return false; + goto _fail; } pipe_sock = get_worker(process_id)->pipe_master; } else if (process_type == Command::TASK_WORKER) { if (process_id >= task_worker_num) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "invalid task_worker_id[%d]", process_id); - return false; + goto _fail; } EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); if (!task_pack(&buf, msg.c_str(), msg.length())) { - return false; + goto _fail; } buf.info.type = SW_SERVER_EVENT_COMMAND_REQUEST; - buf.info.fd = requset_id; + buf.info.fd = request_id; buf.info.server_fd = command_id; int _dst_worker_id = process_id; - if (gs->task_workers.dispatch(&buf, &_dst_worker_id) <= 0) { - return false; + if (!this->task(&buf, &_dst_worker_id)) { + goto _fail; } - command_callbacks[requset_id] = fn; return true; } else if (process_type == Command::MANAGER) { EventData buf; @@ -1187,18 +1190,17 @@ bool Server::command(WorkerId process_id, "message is too large, maximum length is %lu, the given length is %lu", sizeof(buf.data), msg.length()); - return false; + goto _fail; } memset(&buf.info, 0, sizeof(buf.info)); buf.info.type = SW_SERVER_EVENT_COMMAND_REQUEST; - buf.info.fd = requset_id; + buf.info.fd = request_id; buf.info.server_fd = command_id; buf.info.len = msg.length(); memcpy(buf.data, msg.c_str(), msg.length()); if (gs->event_workers.push_message(&buf) < 0) { - return false; + goto _fail; } - command_callbacks[requset_id] = fn; return true; } else if (process_type == Command::MASTER) { auto result = call_command_handler_in_master(command_id, msg); @@ -1206,12 +1208,11 @@ bool Server::command(WorkerId process_id, return true; } else { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_OPERATION_NOT_SUPPORT, "unsupported [process_type]"); - return false; + goto _fail; } if (!message_bus.write(pipe_sock, &task)) { - return false; + goto _fail; } - command_callbacks[requset_id] = fn; return true; } @@ -1607,6 +1608,12 @@ bool Server::close(SessionId session_id, bool reset) { return factory->end(session_id, reset ? (CLOSE_ACTIVELY | CLOSE_RESET) : CLOSE_ACTIVELY); } +bool Server::send_pipe_message(WorkerId worker_id, EventData *msg) { + msg->info.type = SW_SERVER_EVENT_PIPE_MESSAGE; + + return send_to_worker_from_worker(get_worker(worker_id), msg, msg->size(), SW_PIPE_MASTER | SW_PIPE_NONBLOCK) > 0; +} + void Server::init_signal_handler() { swoole_signal_set(SIGPIPE, nullptr); swoole_signal_set(SIGHUP, nullptr); @@ -2077,7 +2084,7 @@ int Server::get_idle_task_worker_num() { return idle_worker_num; } -int Server::get_task_count() { +int Server::get_tasking_num() { // TODO Why need to reset ? int tasking_num = gs->tasking_num; if (tasking_num < 0) { diff --git a/src/server/reactor_thread.cc b/src/server/reactor_thread.cc index 97a62a8d19..9f83cc55d1 100644 --- a/src/server/reactor_thread.cc +++ b/src/server/reactor_thread.cc @@ -310,7 +310,9 @@ void ReactorThread::shutdown(Reactor *reactor) { if (ls->socket->fd % serv->reactor_num != reactor->id) { continue; } - reactor->del(ls->socket); + if (!ls->socket->removed) { + reactor->del(ls->socket); + } } } } diff --git a/src/server/task_worker.cc b/src/server/task_worker.cc index 079b0695eb..3df11f3953 100644 --- a/src/server/task_worker.cc +++ b/src/server/task_worker.cc @@ -53,9 +53,9 @@ void Server::init_task_workers() { } } -static int TaskWorker_call_command_handler(ProcessPool *pool, EventData *req) { +static int TaskWorker_call_command_handler(ProcessPool *pool, Worker *worker, EventData *req) { Server *serv = (Server *) pool->ptr; - int command_id = req->info.server_fd; + int command_id = serv->get_command_id(req); auto iter = serv->command_handlers.find(command_id); if (iter == serv->command_handlers.end()) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_INVALID_COMMAND, "Unknown command[%d]", command_id); @@ -71,8 +71,8 @@ static int TaskWorker_call_command_handler(ProcessPool *pool, EventData *req) { auto result = handler(serv, std::string(packet.data, packet.length)); SendData task{}; - task.info.fd = req->info.fd; - task.info.reactor_id = sw_worker()->id; + task.info.fd = serv->get_task_id(req); + task.info.reactor_id = worker->id; task.info.server_fd = -1; task.info.type = SW_SERVER_EVENT_COMMAND_RESPONSE; task.info.len = result.length(); @@ -92,7 +92,7 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task) } else if (task->info.type == SW_SERVER_EVENT_SHUTDOWN) { SwooleWG.shutdown = true; } else if (task->info.type == SW_SERVER_EVENT_COMMAND_REQUEST) { - ret = TaskWorker_call_command_handler(pool, task); + ret = TaskWorker_call_command_handler(pool, worker, task); } else { ret = serv->onTask(serv, task); /** @@ -107,6 +107,7 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task) } bool Server::task_pack(EventData *task, const void *_data, size_t _length) { + task->info = {}; task->info.type = SW_SERVER_EVENT_TASK; task->info.fd = SwooleG.current_task_id++; task->info.reactor_id = swoole_get_process_id(); @@ -138,6 +139,59 @@ bool Server::task_pack(EventData *task, const void *_data, size_t _length) { return true; } +bool Server::task(EventData *_task, int *dst_worker_id, bool blocking) { + sw_atomic_fetch_add(&gs->tasking_num, 1); + + swResultCode retval; + if (blocking) { + retval = gs->task_workers.dispatch_blocking(_task, dst_worker_id); + } else { + retval = gs->task_workers.dispatch(_task, dst_worker_id); + } + + if (retval == SW_OK) { + sw_atomic_fetch_add(&gs->task_count, 1); + return true; + } + + sw_atomic_fetch_sub(&gs->tasking_num, 1); + return false; +} + +bool Server::task_sync(EventData *_task, int *dst_worker_id, double timeout) { + uint64_t notify; + EventData *task_result = get_task_result(); + sw_memset_zero(task_result, sizeof(*task_result)); + Pipe *pipe = task_notify_pipes.at(swoole_get_process_id()).get(); + network::Socket *task_notify_socket = pipe->get_socket(false); + TaskId task_id = get_task_id(_task); + + // clear history task + while (task_notify_socket->wait_event(0, SW_EVENT_READ) == SW_OK) { + if (task_notify_socket->read(¬ify, sizeof(notify)) <= 0) { + break; + } + } + + if (!task(_task, dst_worker_id, true)) { + return false; + } + + SW_LOOP { + if (task_notify_socket->wait_event((int) (timeout * 1000), SW_EVENT_READ) == SW_OK) { + if (pipe->read(¬ify, sizeof(notify)) > 0) { + if (get_task_id(task_result) != task_id) { + continue; + } + return true; + } + } + break; + } + + return false; +} + bool Server::task_unpack(EventData *task, String *buffer, PacketPtr *packet) { if (!(task->info.ext_flags & SW_TASK_TMPFILE)) { packet->data = task->data; @@ -271,23 +325,21 @@ static int TaskWorker_loop_async(ProcessPool *pool, Worker *worker) { /** * Send the task result to worker */ -int Server::reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task) { - EventData buf; - sw_memset_zero(&buf.info, sizeof(buf.info)); +bool Server::finish(const char *data, size_t data_len, int flags, EventData *current_task) { if (task_worker_num < 1) { swoole_warning("cannot use Server::task()/Server::finish() method, because no set [task_worker_num]"); - return SW_ERR; + return false; } if (current_task == nullptr) { current_task = last_task; } if (current_task->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) { swoole_warning("Server::task()/Server::finish() is not supported in onPipeMessage callback"); - return SW_ERR; + return false; } if (current_task->info.ext_flags & SW_TASK_NOREPLY) { swoole_warning("Server::finish() can only be used in the worker process"); - return SW_ERR; + return false; } uint16_t source_worker_id = current_task->info.reactor_id; @@ -295,16 +347,17 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even if (worker == nullptr) { swoole_warning("invalid worker_id[%d]", source_worker_id); - return SW_ERR; + return false; } ssize_t retval; // for swoole_server_task if (current_task->info.ext_flags & SW_TASK_NONBLOCK) { // write to file + EventData buf; if (!task_pack(&buf, data, data_len)) { swoole_warning("large task pack failed()"); - return SW_ERR; + return false; } // callback function if (current_task->info.ext_flags & SW_TASK_CALLBACK) { @@ -314,7 +367,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even } buf.info.ext_flags |= flags; buf.info.type = SW_SERVER_EVENT_FINISH; - buf.info.fd = current_task->info.fd; + buf.info.fd = get_task_id(current_task); if (worker->pool->use_socket && worker->pool->stream_info_->last_connection) { uint32_t _len = htonl(data_len); @@ -323,7 +376,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even retval = worker->pool->stream_info_->last_connection->send_blocking(data, data_len); } } else { - retval = send_to_worker_from_worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER); + retval = send_to_worker_from_worker(worker, &buf, buf.size(), SW_PIPE_MASTER); } } else { uint64_t flag = 1; @@ -331,7 +384,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even /** * Use worker shm store the result */ - EventData *result = &(task_result[source_worker_id]); + EventData *result = &(task_results[source_worker_id]); Pipe *pipe = task_notify_pipes.at(source_worker_id).get(); // lock worker @@ -342,14 +395,15 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even char *_tmpfile = result->data + 4; File file(_tmpfile, O_APPEND | O_WRONLY); if (file.ready()) { + EventData buf; if (!task_pack(&buf, data, data_len)) { swoole_warning("large task pack failed()"); buf.info.len = 0; } buf.info.ext_flags |= flags; buf.info.type = SW_SERVER_EVENT_FINISH; - buf.info.fd = current_task->info.fd; - size_t bytes = sizeof(buf.info) + buf.info.len; + buf.info.fd = get_task_id(current_task); + size_t bytes = buf.size(); if (file.write_all(&buf, bytes) != bytes) { swoole_sys_warning("write(%s, %ld) failed", _tmpfile, bytes); } @@ -360,11 +414,11 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even // unlock worker worker->lock->unlock(); swoole_warning("large task pack failed()"); - return SW_ERR; + return false; } result->info.ext_flags |= flags; result->info.type = SW_SERVER_EVENT_FINISH; - result->info.fd = current_task->info.fd; + result->info.fd = get_task_id(current_task); } // unlock worker @@ -388,6 +442,6 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even swoole_sys_warning("send result to worker failed"); } } - return retval; + return true; } } // namespace swoole diff --git a/tests/swoole_server/parse_option_to_size.phpt b/tests/swoole_server/parse_option_to_size.phpt index 8a742b39b4..b50b825962 100644 --- a/tests/swoole_server/parse_option_to_size.phpt +++ b/tests/swoole_server/parse_option_to_size.phpt @@ -1,7 +1,9 @@ --TEST-- swoole_server: parse option value to size --SKIPIF-- - + --FILE-- --EXPECTF-- [%s] WARNING Socket::send_blocking(): send %d bytes failed, Error: Resource temporarily unavailable[11] -[%s] WARNING Server::reply_task_result() (ERRNO %d): send result to worker timed out +[%s] WARNING Server::finish() (ERRNO %d): send result to worker timed out