Skip to content

Commit

Permalink
・[fix] IPC有効時、外部プロセスとの通信にコケるのを修正(マルチスレッドで各種関数が実行されることがあるようなので、排他制御を入れた)
Browse files Browse the repository at this point in the history
・[change] 名前付きパイプの設定を、バイトストリームモードへ変更(上のバグのせいで変なことしてた)
・[fix] 外部プロセスにて、func_info_getが失敗することを考慮に入れてなかったのを修正
・[change] 名前付きパイプの名前をランダムに変更するようにした
  • Loading branch information
amate committed Aug 25, 2019
1 parent eb8323e commit 467bb56
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 60 deletions.
71 changes: 46 additions & 25 deletions InputPipeMain/InputPipeMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,34 +134,41 @@ int APIENTRY wWinMain(_In_ HINSTANCE hInstance,
return 0;
}

// for Debug
CallFunc lastCallFunc;

for (;;) {
std::vector<BYTE> readData = namedPipe.Read(kToWindDataHeaderSize);
if (readData.size() == 0) {
assert(false);
break;
}
ToWinputData* toData = (ToWinputData*)readData.data();
lastCallFunc = toData->header.callFunc;

std::vector<BYTE> dataBody = namedPipe.Read(toData->header.paramSize);
switch (toData->header.callFunc) {
case CallFunc::kOpen:
{
LPSTR file = (LPSTR)dataBody.data();
INPUT_HANDLE ih = g_winputPluginTable->func_open(file);
//INFO_LOG << L"kOpen: " << ih;
INFO_LOG << L"kOpen: " << ih;

auto fromData = GenerateFromInputData(ih, 0);
auto fromData = GenerateFromInputData(CallFunc::kOpen, ih, 0);
namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
//INFO_LOG << L"Write: " << FromWinputDataTotalSize(*fromData) << L" bytes";
}
break;

case CallFunc::kClose:
{
StandardParamPack* spp = (StandardParamPack*)dataBody.data();
BOOL b = g_winputPluginTable->func_close(spp->ih);
//INFO_LOG << L"kClose: " << spp->ih;
INFO_LOG << L"kClose: " << spp->ih;

auto fromData = GenerateFromInputData(b, 0);
auto fromData = GenerateFromInputData(CallFunc::kClose, b, 0);
namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
//INFO_LOG << L"Write: " << FromWinputDataTotalSize(*fromData) << L" bytes";
}
break;

Expand All @@ -171,20 +178,28 @@ int APIENTRY wWinMain(_In_ HINSTANCE hInstance,
StandardParamPack* spp = (StandardParamPack*)dataBody.data();
INPUT_INFO inputInfo = {};
BOOL b = g_winputPluginTable->func_info_get(spp->ih, &inputInfo);
//INFO_LOG << L"kInfoGet: " << spp->ih;

int totalInputInfoSize = sizeof(INPUT_INFO) + inputInfo.format_size + inputInfo.audio_format_size;
std::vector<BYTE> entireInputInfo(totalInputInfoSize);
errno_t e = ::memcpy_s(entireInputInfo.data(), totalInputInfoSize, &inputInfo, sizeof(INPUT_INFO));
e = ::memcpy_s(entireInputInfo.data() + sizeof(INPUT_INFO),
inputInfo.format_size,
inputInfo.format, inputInfo.format_size);
e = ::memcpy_s(entireInputInfo.data() + sizeof(INPUT_INFO) + inputInfo.format_size,
inputInfo.audio_format_size,
inputInfo.audio_format, inputInfo.audio_format_size);

auto fromData = GenerateFromInputData(b, entireInputInfo);
namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
assert(b);
INFO_LOG << L"kInfoGet: " << spp->ih;
if (b) {
int totalInputInfoSize = sizeof(INPUT_INFO) + inputInfo.format_size + inputInfo.audio_format_size;
std::vector<BYTE> entireInputInfo(totalInputInfoSize);
errno_t e = ::memcpy_s(entireInputInfo.data(), totalInputInfoSize, &inputInfo, sizeof(INPUT_INFO));
e = ::memcpy_s(entireInputInfo.data() + sizeof(INPUT_INFO),
inputInfo.format_size,
inputInfo.format, inputInfo.format_size);
e = ::memcpy_s(entireInputInfo.data() + sizeof(INPUT_INFO) + inputInfo.format_size,
inputInfo.audio_format_size,
inputInfo.audio_format, inputInfo.audio_format_size);

auto fromData = GenerateFromInputData(CallFunc::kInfoGet, b, entireInputInfo.data(), totalInputInfoSize);
namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
//INFO_LOG << L"Write: " << FromWinputDataTotalSize(*fromData) << L" bytes";

} else {
auto fromData = GenerateFromInputData(CallFunc::kInfoGet, b, 0);
namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
//INFO_LOG << L"Write: " << FromWinputDataTotalSize(*fromData) << L" bytes";
}
}
break;

Expand All @@ -198,12 +213,14 @@ int APIENTRY wWinMain(_In_ HINSTANCE hInstance,
int readBytes = g_winputPluginTable->func_read_video(spp->ih, spp->param1, g_readVideoBuffer.data());
//INFO_LOG << L"kReadVideo: " << spp->ih;

namedPipe.Write((const BYTE*)&toData->header.callFunc, sizeof(toData->header.callFunc));
std::int32_t totalSize = sizeof(int) + readBytes;
namedPipe.Write((const BYTE*)&totalSize, sizeof(totalSize));
namedPipe.Write((const BYTE*)&readBytes, sizeof(readBytes));
namedPipe.Write((const BYTE*)g_readVideoBuffer.data(), readBytes);
//auto fromData = GenerateFromInputData(readBytes, g_readVideoBuffer);
//auto fromData = GenerateFromInputData(toData->header.callFunc, readBytes, g_readVideoBuffer.data(), readBytes);
//namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
//INFO_LOG << L"Write: " << FromWinputDataTotalSize(*fromData) << L" bytes";
}
break;

Expand All @@ -216,25 +233,29 @@ int APIENTRY wWinMain(_In_ HINSTANCE hInstance,
g_readAudioBuffer.resize(requestReadBytes);
}
int readSample = g_winputPluginTable->func_read_audio(spp->ih, spp->param1, spp->param2, g_readAudioBuffer.data());
//INFO_LOG << L"kReadAudio: " << spp->ih;
assert(readSample > 0);
//INFO_LOG << L"kReadAudio: " << spp->ih << L" readSample: " << readSample;
const int readBufferSize = PerAudioSampleBufferSize * readSample;

std::int32_t totalSize = sizeof(int) + g_readAudioBuffer.size();
namedPipe.Write((const BYTE*)& toData->header.callFunc, sizeof(toData->header.callFunc));
std::int32_t totalSize = sizeof(int) + readBufferSize;
namedPipe.Write((const BYTE*)& totalSize, sizeof(totalSize));
namedPipe.Write((const BYTE*)& readSample, sizeof(readSample));
namedPipe.Write((const BYTE*)g_readAudioBuffer.data(), g_readAudioBuffer.size());
//auto fromData = GenerateFromInputData(readBytes, g_readAudioBuffer);
namedPipe.Write((const BYTE*)g_readAudioBuffer.data(), readBufferSize);
//auto fromData = GenerateFromInputData(toData->header.callFunc, readSample, g_readAudioBuffer.data(), readBufferSize);
//namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
//INFO_LOG << L"Write: " << FromWinputDataTotalSize(*fromData) << L" bytes";
}
break;

case CallFunc::kIsKeyframe:
{
//INFO_LOG << L"kIsKeyframe";
INFO_LOG << L"kIsKeyframe";

StandardParamPack* spp = (StandardParamPack*)dataBody.data();
BOOL b = g_winputPluginTable->func_is_keyframe(spp->ih, spp->param1);

auto fromData = GenerateFromInputData(b, 0);
auto fromData = GenerateFromInputData(CallFunc::kIsKeyframe, b, 0);
namedPipe.Write((const BYTE*)fromData.get(), FromWinputDataTotalSize(*fromData));
}
break;
Expand Down
79 changes: 67 additions & 12 deletions InputPipePlugin/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <vector>
#include <tuple>
#include <algorithm>
#include <random>
#include <mutex>
#include <windows.h>
//#include <vfw.h>
//#pragma comment(lib, "Vfw32.lib")
Expand All @@ -30,7 +32,7 @@ BindProcess g_bindProcess;
LPCWSTR kPipeName = LR"(\\.\pipe\InputPipePlugin)";
NamedPipe g_namedPipe;

std::vector<BYTE> g_lastInfoGetData;
//std::vector<BYTE> g_lastInfoGetData;

Config m_config;

Expand All @@ -42,9 +44,13 @@ struct FrameAudioVideoBufferSize

std::unordered_map<INPUT_HANDLE, FrameAudioVideoBufferSize> g_mapFrameBufferSize;

std::unordered_map<INPUT_HANDLE, std::unique_ptr<BYTE[]>> g_mapInputHandleInfoGetData;

using HandleCache = std::tuple<std::string, INPUT_HANDLE, int>;
std::vector<HandleCache> g_vecHandleCache;

std::mutex g_mtxIPC;

// for Logger
std::string LogFileName()
{
Expand Down Expand Up @@ -129,7 +135,16 @@ BOOL func_init( void )

if (m_config.bEnableIPC) {
INFO_LOG << "EnableIPC";
std::wstring pipeName = std::wstring(kPipeName) + std::to_wstring((uint64_t)g_hModule);

enum { kRandamStrLength = 64 };
std::random_device randdev;
WCHAR tempstr[] = L"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::uniform_int_distribution<int> dist(0, std::size(tempstr) - 1);
std::wstring randamString = L"_";
for (int i = 0; i < kRandamStrLength; ++i) {
randamString += tempstr[dist(randdev)];
}
std::wstring pipeName = std::wstring(kPipeName) + randamString;
bool ret = g_namedPipe.CreateNamedPipe(pipeName);
// INFO_LOG << L"CreateNamedPipe: " << pipeName << L" ret: " << ret;

Expand Down Expand Up @@ -173,6 +188,7 @@ BOOL func_exit( void )
INPUT_HANDLE func_open( LPSTR file )
{
INFO_LOG << L"func_open: " << CodeConvert::UTF16fromShiftJIS(file);
std::lock_guard<std::mutex> lock(g_mtxIPC);

#ifndef NO_REMOTE
if (m_config.bEnableHandleCache) {
Expand Down Expand Up @@ -201,6 +217,8 @@ INPUT_HANDLE func_open( LPSTR file )

std::vector<BYTE> headerData = g_namedPipe.Read(kFromWinputDataHeaderSize);
FromWinputData* fromData = (FromWinputData*)headerData.data();
//INFO_LOG << L"Read: " << fromData->returnSize << L" bytes";
assert(fromData->callFunc == CallFunc::kOpen);
std::vector<BYTE> readBody = g_namedPipe.Read(fromData->returnSize);
INPUT_HANDLE ih2 = *(INPUT_HANDLE*)readBody.data();
INFO_LOG << ih2;
Expand Down Expand Up @@ -229,6 +247,7 @@ INPUT_HANDLE func_open( LPSTR file )
BOOL func_close( INPUT_HANDLE ih )
{
INFO_LOG << L"func_close: " << ih;
std::lock_guard<std::mutex> lock(g_mtxIPC);
#ifndef NO_REMOTE
if (m_config.bEnableHandleCache) {
auto itfound = std::find_if(g_vecHandleCache.begin(), g_vecHandleCache.end(),
Expand All @@ -252,12 +271,19 @@ BOOL func_close( INPUT_HANDLE ih )
}
}
if (m_config.bEnableIPC) {
auto itfound = g_mapInputHandleInfoGetData.find(ih);
if (itfound != g_mapInputHandleInfoGetData.end()) {
g_mapInputHandleInfoGetData.erase(itfound);
}

StandardParamPack spp = { ih };
auto toData = GenerateToInputData(CallFunc::kClose, spp);
g_namedPipe.Write((const BYTE*)toData.get(), ToWinputDataTotalSize(*toData));

std::vector<BYTE> headerData = g_namedPipe.Read(kFromWinputDataHeaderSize);
FromWinputData* fromData = (FromWinputData*)headerData.data();
//INFO_LOG << L"Read: " << fromData->returnSize << L" bytes";
assert(fromData->callFunc == CallFunc::kClose);
std::vector<BYTE> readBody = g_namedPipe.Read(fromData->returnSize);
auto retData = ParseFromInputData<BOOL>(readBody);

Expand Down Expand Up @@ -298,26 +324,46 @@ BOOL func_close( INPUT_HANDLE ih )
BOOL func_info_get( INPUT_HANDLE ih,INPUT_INFO *iip )
{
INFO_LOG << L"func_info_get";
std::lock_guard<std::mutex> lock(g_mtxIPC);
#ifndef NO_REMOTE
if (m_config.bEnableIPC) {
auto itfound = g_mapInputHandleInfoGetData.find(ih);
if (itfound != g_mapInputHandleInfoGetData.end()) {
INFO_LOG << L"InfoGetData cache found!";
*iip = *reinterpret_cast<INPUT_INFO*>(itfound->second.get());
return TRUE;
}

StandardParamPack spp = { ih };
auto toData = GenerateToInputData(CallFunc::kInfoGet, spp);
g_namedPipe.Write((const BYTE*)toData.get(), ToWinputDataTotalSize(*toData));

std::vector<BYTE> headerData = g_namedPipe.Read(kFromWinputDataHeaderSize);
FromWinputData* fromData = (FromWinputData*)headerData.data();
INFO_LOG << L"Read: " << fromData->returnSize << L" bytes";
assert(fromData->callFunc == CallFunc::kInfoGet);
std::vector<BYTE> readBody = g_namedPipe.Read(fromData->returnSize);
auto retData = ParseFromInputData<BOOL>(readBody);
*iip = *(INPUT_INFO*)retData.second;
iip->format = (BITMAPINFOHEADER*)(retData.second + sizeof(INPUT_INFO));
iip->audio_format = (WAVEFORMATEX*)(retData.second + sizeof(INPUT_INFO) + iip->format_size);
g_lastInfoGetData = std::move(readBody);

int OneFrameBufferSize = iip->format->biWidth * iip->format->biHeight * (iip->format->biBitCount / 8) + kVideoBufferSurplusBytes;
int PerAudioSampleBufferSize = iip->audio_format->nChannels * (iip->audio_format->wBitsPerSample / 8);
g_mapFrameBufferSize.emplace(ih, FrameAudioVideoBufferSize{ OneFrameBufferSize , PerAudioSampleBufferSize });

INFO_LOG << L"OneFrameBufferSize: " << OneFrameBufferSize << L" PerAudioSampleBufferSize: " << PerAudioSampleBufferSize;
if (retData.first) {
auto tempInputInfo = (INPUT_INFO*)retData.second;
const int infoGetDataSize = sizeof(INPUT_INFO) + tempInputInfo->format_size + tempInputInfo->audio_format_size;
auto infoGetData = std::make_unique<BYTE[]>(infoGetDataSize);
memcpy_s(infoGetData.get(), infoGetDataSize, tempInputInfo, infoGetDataSize);

auto igData = reinterpret_cast<INPUT_INFO*>(infoGetData.get());
igData->format = (BITMAPINFOHEADER*)(infoGetData.get() + sizeof(INPUT_INFO));
igData->audio_format = (WAVEFORMATEX*)(infoGetData.get() + sizeof(INPUT_INFO) + igData->format_size);
*iip = *igData;
g_mapInputHandleInfoGetData.emplace(ih, std::move(infoGetData));

const int OneFrameBufferSize = iip->format->biWidth * iip->format->biHeight * (iip->format->biBitCount / 8);
const int PerAudioSampleBufferSize = iip->audio_format->nChannels * (iip->audio_format->wBitsPerSample / 8);
g_mapFrameBufferSize.emplace(ih, FrameAudioVideoBufferSize{ OneFrameBufferSize , PerAudioSampleBufferSize });

INFO_LOG << L"OneFrameBufferSize: " << OneFrameBufferSize << L" PerAudioSampleBufferSize: " << PerAudioSampleBufferSize;
} else {
ERROR_LOG << L"func_info_get failed, ih: " << ih;
}
return retData.first;
} else {
BOOL b = g_winputPluginTable->func_info_get(ih, iip);
Expand Down Expand Up @@ -360,6 +406,7 @@ BOOL func_info_get( INPUT_HANDLE ih,INPUT_INFO *iip )
int func_read_video( INPUT_HANDLE ih,int frame,void *buf )
{
//INFO_LOG << L"func_read_video" << L" frame: " << frame;
std::lock_guard<std::mutex> lock(g_mtxIPC);
#ifndef NO_REMOTE
if (m_config.bEnableIPC) {
const int OneFrameBufferSize = g_mapFrameBufferSize[ih].OneFrameBufferSize;
Expand All @@ -371,6 +418,8 @@ int func_read_video( INPUT_HANDLE ih,int frame,void *buf )

std::vector<BYTE> headerData = g_namedPipe.Read(kFromWinputDataHeaderSize);
FromWinputData* fromData = (FromWinputData*)headerData.data();
//INFO_LOG << L"Read: " << fromData->returnSize << L" bytes";
assert(fromData->callFunc == CallFunc::kReadVideo);
int readBytes = 0;
int nRet = g_namedPipe.Read((BYTE*)& readBytes, sizeof(readBytes));
assert(nRet == sizeof(readBytes));
Expand Down Expand Up @@ -413,6 +462,7 @@ int func_read_video( INPUT_HANDLE ih,int frame,void *buf )
int func_read_audio(INPUT_HANDLE ih, int start, int length, void* buf)
{
//INFO_LOG << L"func_read_audio: " << ih << L" start: " << start << L" length: " << length;
std::lock_guard<std::mutex> lock(g_mtxIPC);

#ifndef NO_REMOTE
if (m_config.bEnableIPC) {
Expand All @@ -424,10 +474,13 @@ int func_read_audio(INPUT_HANDLE ih, int start, int length, void* buf)

std::vector<BYTE> headerData = g_namedPipe.Read(kFromWinputDataHeaderSize);
FromWinputData* fromData = (FromWinputData*)headerData.data();
//INFO_LOG << L"Read: " << fromData->returnSize << L" bytes";
assert(fromData->callFunc == CallFunc::kReadAudio);
int readSample = 0;
int nRet = g_namedPipe.Read((BYTE*)& readSample, sizeof(readSample));
assert(nRet == sizeof(readSample));
const int audioBufferSize = fromData->returnSize - sizeof(int);
assert(audioBufferSize >= 0);
nRet = g_namedPipe.Read((BYTE*)buf, audioBufferSize);
assert(nRet == audioBufferSize);
return readSample;
Expand Down Expand Up @@ -466,6 +519,7 @@ int func_read_audio(INPUT_HANDLE ih, int start, int length, void* buf)
BOOL func_is_keyframe(INPUT_HANDLE ih, int frame)
{
// INFO_LOG << L"func_is_keyframe" << L" frame:" << frame;
std::lock_guard<std::mutex> lock(g_mtxIPC);

#ifndef NO_REMOTE
if (m_config.bEnableIPC) {
Expand All @@ -475,6 +529,7 @@ BOOL func_is_keyframe(INPUT_HANDLE ih, int frame)

std::vector<BYTE> headerData = g_namedPipe.Read(kFromWinputDataHeaderSize);
FromWinputData* fromData = (FromWinputData*)headerData.data();
assert(fromData->callFunc == CallFunc::kIsKeyframe);
std::vector<BYTE> readBody = g_namedPipe.Read(fromData->returnSize);
auto retData = ParseFromInputData<BOOL>(readBody);
return retData.first;
Expand Down
11 changes: 11 additions & 0 deletions Readme.txt
Loading

0 comments on commit 467bb56

Please sign in to comment.