Skip to content

Commit

Permalink
[unity] migrate node.js polling optimization from 2.0 to 1.4
Browse files Browse the repository at this point in the history
  • Loading branch information
zombieyang committed Aug 22, 2023
1 parent ca9c2b9 commit 1629ca5
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 18 deletions.
40 changes: 39 additions & 1 deletion unity/native_src/Inc/JSEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
#include "node.h"
#include "uv.h"
#pragma warning(pop)

#if PLATFORM_WINDOWS
#include <windows.h>
#elif PLATFORM_LINUX
#include <sys/epoll.h>
#elif PLATFORM_MAC
#include <sys/select.h>
#include <sys/sysctl.h>
#include <sys/time.h>
#include <sys/types.h>
#endif
#else

#if defined(PLATFORM_WINDOWS)
Expand Down Expand Up @@ -192,7 +203,7 @@ class JSEngine

private:
#if defined(WITH_NODEJS)
uv_loop_t* NodeUVLoop;
uv_loop_t NodeUVLoop;

std::unique_ptr<node::ArrayBufferAllocator> NodeArrayBufferAllocator;

Expand All @@ -201,6 +212,33 @@ class JSEngine
node::Environment* NodeEnv;

const float UV_LOOP_DELAY = 0.1;

uv_thread_t PollingThread;

uv_sem_t PollingSem;

uv_async_t DummyUVHandle;

bool PollingClosed = false;

// FGraphEventRef LastJob;
bool hasPendingTask = false;

#if PLATFORM_LINUX
int Epoll;
#endif

void StartPolling();

void UvRunOnce();

void PollEvents();

static void OnWatcherQueueChanged(uv_loop_t* loop);

void WakeupPollingThread();

void StopPolling();
#endif
v8::Isolate::CreateParams* CreateParams;

Expand Down
194 changes: 177 additions & 17 deletions unity/native_src/Src/JSEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ namespace puerts
#endif
v8::V8::SetFlagsFromString(Flags.c_str(), static_cast<int>(Flags.size()));

NodeUVLoop = new uv_loop_t;
const int Ret = uv_loop_init(NodeUVLoop);
const int Ret = uv_loop_init(&NodeUVLoop);
if (Ret != 0)
{
// TODO log
Expand All @@ -115,7 +114,7 @@ namespace puerts
// PLog(puerts::Log, "[PuertsDLL][JSEngineWithNode]isolate");

auto Platform = static_cast<node::MultiIsolatePlatform*>(GPlatform.get());
MainIsolate = node::NewIsolate(NodeArrayBufferAllocator.get(), NodeUVLoop,
MainIsolate = node::NewIsolate(NodeArrayBufferAllocator.get(), &NodeUVLoop,
Platform);

auto Isolate = MainIsolate;
Expand All @@ -138,7 +137,7 @@ namespace puerts
v8::Local<v8::Value> Console = Global->Get(Context, FV8Utils::V8String(MainIsolate, "console")).ToLocalChecked();

// PLog(puerts::Log, "[PuertsDLL][JSEngineWithNode]isolatedata start");
NodeIsolateData = node::CreateIsolateData(Isolate, NodeUVLoop, Platform, NodeArrayBufferAllocator.get()); // node::FreeIsolateData
NodeIsolateData = node::CreateIsolateData(Isolate, &NodeUVLoop, Platform, NodeArrayBufferAllocator.get()); // node::FreeIsolateData

//kDefaultFlags = kOwnsProcessState | kOwnsInspector, if kOwnsInspector set, inspector_agent.cc:681 CHECK_EQ(start_io_thread_async_initialized.exchange(true), false) fail!
NodeEnv = CreateEnvironment(NodeIsolateData, Context, *Args, *ExecArgs, node::EnvironmentFlags::kOwnsProcessState);
Expand Down Expand Up @@ -169,6 +168,10 @@ namespace puerts

JSObjectIdMap.Reset(MainIsolate, v8::Map::New(MainIsolate));

#if defined(WITH_NODEJS)
StartPolling();
#endif

//the same as raw v8
MainIsolate->SetMicrotasksPolicy(v8::MicrotasksPolicy::kAuto);
}
Expand Down Expand Up @@ -257,6 +260,10 @@ namespace puerts

JSEngine::~JSEngine()
{
#if WITH_NODEJS
LogicTick();
StopPolling();
#endif
if (Inspector)
{
delete Inspector;
Expand Down Expand Up @@ -330,10 +337,6 @@ namespace puerts
node::FreeEnvironment(NodeEnv);
node::FreeIsolateData(NodeIsolateData);
auto Platform = static_cast<node::MultiIsolatePlatform*>(GPlatform.get());
bool platform_finished = false;
Platform->AddIsolateFinishedCallback(MainIsolate, [](void* data) {
*static_cast<bool*>(data) = true;
}, &platform_finished);
Platform->UnregisterIsolate(MainIsolate);
#endif

Expand All @@ -343,15 +346,7 @@ namespace puerts
MainIsolate = nullptr;

#if WITH_NODEJS
// Wait until the platform has cleaned up all relevant resources.
while (!platform_finished)
{
uv_run(NodeUVLoop, UV_RUN_ONCE);
}

int err = uv_loop_close(NodeUVLoop);
assert(err == 0);
delete NodeUVLoop;
#else
delete CreateParams->array_buffer_allocator;
delete CreateParams;
Expand Down Expand Up @@ -767,6 +762,170 @@ namespace puerts
#endif
}


#if defined(WITH_NODEJS)
void JSEngine::StartPolling()
{
uv_async_init(&NodeUVLoop, &DummyUVHandle, nullptr);
uv_sem_init(&PollingSem, 0);
uv_thread_create(
&PollingThread,
[](void* arg)
{
auto* self = static_cast<JSEngine*>(arg);
while (true)
{
uv_sem_wait(&self->PollingSem);

if (self->PollingClosed)
break;

self->PollEvents();

if (self->PollingClosed)
break;

self->hasPendingTask = true;
}
},
this
);

#if PLATFORM_WINDOWS
// on single-core the io comp port NumberOfConcurrentThreads needs to be 2
// to avoid cpu pegging likely caused by a busy loop in PollEvents
// if (FPlatformMisc::NumberOfCores() == 1)
if (false)
{
if (NodeUVLoop.iocp && NodeUVLoop.iocp != INVALID_HANDLE_VALUE)
CloseHandle(NodeUVLoop.iocp);
NodeUVLoop.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 2);
}
#elif PLATFORM_LINUX
Epoll = epoll_create(1);
int backend_fd = uv_backend_fd(&NodeUVLoop);
struct epoll_event ev = {0};
ev.events = EPOLLIN;
ev.data.fd = backend_fd;
epoll_ctl(Epoll, EPOLL_CTL_ADD, backend_fd, &ev);
NodeUVLoop.data = this;
NodeUVLoop.on_watcher_queue_updated = OnWatcherQueueChanged;

#elif PLATFORM_MAC
NodeUVLoop.data = this;
NodeUVLoop.on_watcher_queue_updated = OnWatcherQueueChanged;
#endif
UvRunOnce();
}

void JSEngine::UvRunOnce()
{
auto Isolate = MainIsolate;
#ifdef THREAD_SAFE
v8::Locker Locker(Isolate);
#endif
v8::Isolate::Scope IsolateScope(Isolate);
v8::HandleScope HandleScope(Isolate);
auto Context = ResultInfo.Context.Get(Isolate);
v8::Context::Scope ContextScope(Context);

// TODO: catch uv_run可以让脚本错误不至于进程退出,但这不知道会不会对node有什么副作用
v8::TryCatch TryCatch(Isolate);

uv_run(&NodeUVLoop, UV_RUN_NOWAIT);
if (TryCatch.HasCaught())
{
// Logger->Error(FString::Printf(TEXT("uv_run throw: %s"), *FV8Utils::TryCatchToString(Isolate, &TryCatch)));
}
else
{
static_cast<node::MultiIsolatePlatform*>(GPlatform.get())->DrainTasks(Isolate);
}

hasPendingTask = false;

// Tell the Polling thread to continue.
uv_sem_post(&PollingSem);
}

void JSEngine::PollEvents()
{
#if PLATFORM_WINDOWS
DWORD bytes;
DWORD timeout = uv_backend_timeout(&NodeUVLoop);
ULONG_PTR key;
OVERLAPPED* overlapped;

timeout = timeout > 100 ? 100 : timeout;

GetQueuedCompletionStatus(NodeUVLoop.iocp, &bytes, &key, &overlapped, timeout);

// Give the event back so libuv can deal with it.
if (overlapped != NULL)
PostQueuedCompletionStatus(NodeUVLoop.iocp, bytes, key, overlapped);
#elif PLATFORM_LINUX
int timeout = uv_backend_timeout(&NodeUVLoop);
timeout = (timeout > 100 || timeout < 0) ? 100 : timeout;

// Wait for new libuv events.
int r;
do
{
struct epoll_event ev;
r = epoll_wait(Epoll, &ev, 1, timeout);
} while (r == -1 && errno == EINTR);
#elif PLATFORM_MAC
struct timeval tv;
int timeout = uv_backend_timeout(&NodeUVLoop);
timeout = (timeout > 100 || timeout < 0) ? 100 : timeout;
if (timeout != -1)
{
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
}

fd_set readset;
int fd = uv_backend_fd(&NodeUVLoop);
FD_ZERO(&readset);
FD_SET(fd, &readset);

// Wait for new libuv events.
int r;
do
{
r = select(fd + 1, &readset, nullptr, nullptr, timeout == -1 ? nullptr : &tv);
} while (r == -1 && errno == EINTR);
#endif
}

void JSEngine::OnWatcherQueueChanged(uv_loop_t* loop)
{
#if !PLATFORM_WINDOWS
JSEngine* self = static_cast<JSEngine*>(loop->data);
self->WakeupPollingThread();
#endif
}

void JSEngine::WakeupPollingThread()
{
uv_async_send(&DummyUVHandle);
}

void JSEngine::StopPolling()
{
PollingClosed = true;

uv_sem_post(&PollingSem);

WakeupPollingThread();

uv_thread_join(&PollingThread);

uv_sem_destroy(&PollingSem);
}
#endif


void JSEngine::CreateInspector(int32_t Port)
{
v8::Isolate* Isolate = MainIsolate;
Expand Down Expand Up @@ -815,7 +974,8 @@ namespace puerts
v8::Local<v8::Context> Context = ResultInfo.Context.Get(Isolate);
v8::Context::Scope ContextScope(Context);

uv_run(NodeUVLoop, UV_RUN_NOWAIT);
if (hasPendingTask)
UvRunOnce();
static_cast<node::MultiIsolatePlatform*>(GPlatform.get())->DrainTasks(Isolate);
#endif
}
Expand Down

0 comments on commit 1629ca5

Please sign in to comment.