From e4c87a024b0669e722ad6d654be7e64763fdd014 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Nov 2024 09:58:32 +0800 Subject: [PATCH] fix: memory free not found issues --- include/os/osMemPool.h | 27 ++++++++++++++++----------- source/common/src/tglobal.c | 3 ++- source/dnode/mgmt/exe/dmMain.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 7 +++++-- source/libs/qworker/inc/qwInt.h | 4 ++-- source/libs/qworker/src/qwUtil.c | 4 ++-- source/libs/qworker/src/qworker.c | 12 ++++++------ source/libs/sync/src/syncRaftStore.c | 2 +- source/libs/wal/src/walMeta.c | 4 ++-- source/os/src/osMemory.c | 7 ++++++- source/util/src/thash.c | 25 ++++++++++--------------- source/util/src/tmempool.c | 8 +++++++- 12 files changed, 60 insertions(+), 45 deletions(-) diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 5c926c9b4a15..c47fa29a4d70 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -145,20 +145,25 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp); #ifndef BUILD_TEST extern void* gMemPoolHandle; extern threadlocal void* threadPoolSession; +extern threadlocal bool threadPoolEnabled; -#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) -#define taosDisableMemoryPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) +#define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) +#define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) -#define taosMemoryMalloc(_size) ((NULL != gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) -#define taosMemoryCalloc(_num, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) -#define taosMemoryRealloc(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) -#define taosStrdup(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) -#define taosStrndup(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) -#define taosMemoryFree(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) -#define taosMemorySize(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size, _trimed) ((NULL != gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) -#define taosMemoryMallocAlign(_alignment, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) +#define taosEnableMemPoolUsage() do { threadPoolEnabled = true; tsEnableRandErr = true;} while (0) +#define taosDisableMemPoolUsage() do { threadPoolEnabled = false; tsEnableRandErr = false;} while (0) + + +#define taosMemoryMalloc(_size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) +#define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) +#define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) +#define taosStrdup(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define taosStrndup(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) +#define taosMemoryFree(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) +#define taosMemorySize(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) +#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) +#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #else #define taosEnableMemoryPoolUsage(_pool, _session) #define taosDisableMemoryPoolUsage() diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index aa10d1fd6f91..4a2fd7bad9c3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -2076,7 +2076,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"supportVnodes", &tsNumOfSupportVnodes}, {"experimental", &tsExperimental}, {"maxTsmaNum", &tsMaxTsmaNum}, - {"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}}; + {"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}, + {"minReservedMemorySize", &tsMinReservedMemorySize}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index cd4bdc68e667..1b27d7a084a6 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -445,7 +445,7 @@ int mainWindows(int argc, char **argv) { } if ((code = taosMemoryPoolInit(qWorkerRetireJobs, qWorkerRetireJob)) != 0) { - dError("failed to init conv"); + dError("failed to init memPool, error:0x%x", code); taosCloseLog(); taosCleanupArgs(); return code; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 0dbc270dfd17..23e91e8d63fd 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -627,9 +627,12 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead)); - terrno = tEncodeSVDropStbReq(&encoder, &req); + int32_t code = tEncodeSVDropStbReq(&encoder, &req); tEncoderClear(&encoder); - if (terrno != 0) return NULL; + if (code != 0) { + terrno = code; + return NULL; + } *pContLen = contLen; return pHead; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 7aa0baecd55f..ca1f6d358f68 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -262,11 +262,11 @@ extern SQueryMgmt gQueryMgmt; #define QW_SINK_ENABLE_MEMPOOL(_ctx) \ do { \ if ((_ctx)->sinkWithMemPool) { \ - taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, (_ctx)->memPoolSession); \ + taosEnableFullMemPoolUsage((_ctx)->memPoolSession); \ } \ } while (0) -#define QW_SINK_DISABLE_MEMPOOL() taosDisableMemoryPoolUsage() +#define QW_SINK_DISABLE_MEMPOOL() taosDisableFullMemPoolUsage() #define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n) diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 9fb659f19712..ecb0d88ae535 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -276,9 +276,9 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { - taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); + taosEnableFullMemPoolUsage(ctx->memPoolSession); qDestroyTask(otaskHandle); - taosDisableMemoryPoolUsage(); + taosDisableFullMemPoolUsage(); qDebug("task handle destroyed"); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4bdd6388498c..f7efea799534 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -167,9 +167,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (taskHandle) { qwDbgSimulateSleep(); - taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); + taosEnableFullMemPoolUsage(ctx->memPoolSession); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); - taosDisableMemoryPoolUsage(); + taosDisableFullMemPoolUsage(); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { @@ -803,9 +803,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->queryMsgType = qwMsg->msgType; ctx->localExec = false; - taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); + taosEnableFullMemPoolUsage(ctx->memPoolSession); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); - taosDisableMemoryPoolUsage(); + taosDisableFullMemPoolUsage(); if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; @@ -813,9 +813,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } - taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); + taosEnableFullMemPoolUsage(ctx->memPoolSession); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); - taosDisableMemoryPoolUsage(); + taosDisableFullMemPoolUsage(); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index c61be4356cfb..7ecb73d21a3a 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -142,7 +142,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) { _OVER: if (pJson != NULL) tjsonDelete(pJson); - if (buffer != NULL) taosMemoryFree(buffer); + if (buffer != NULL) taosMemFree(buffer); if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index d9666eb02f58..a2e780b621f0 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -1085,13 +1085,13 @@ int32_t walSaveMeta(SWal* pWal) { } } - taosMemoryFree(serialized); + taosMemFree(serialized); return code; _err: wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code)); (void)taosCloseFile(&pMetaFile); - taosMemoryFree(serialized); + taosMemFree(serialized); return code; } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 6f71c87bba7b..fdbf4853adcd 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -433,7 +433,12 @@ int32_t taosMemTrim(int32_t size, bool* trimed) { // do nothing return TSDB_CODE_SUCCESS; #else - *trimed = malloc_trim(size); + if (trimed) { + *trimed = malloc_trim(size); + } else { + malloc_trim(size); + } + return TSDB_CODE_SUCCESS; #endif } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 88fe6618b93a..6e759c98cac7 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -318,6 +318,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo return terrno = TSDB_CODE_INVALID_PTR; } + int32_t code = TSDB_CODE_SUCCESS; uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // need the resize process, write lock applied @@ -327,10 +328,15 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo taosHashWUnlock(pHashObj); } + SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); + if (pNewNode == NULL) { + code = terrno; + return code; + } + // disable resize taosHashRLock(pHashObj); - int32_t code = TSDB_CODE_SUCCESS; uint32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; @@ -350,33 +356,22 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table - SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); - if (pNewNode == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; - goto _exit; - } - pushfrontNodeInEntryList(pe, pNewNode); (void)atomic_add_fetch_64(&pHashObj->size, 1); } else { // not support the update operation, return error if (pHashObj->enableUpdate) { - SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); - if (pNewNode == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; - goto _exit; - } - doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); } else { + FREE_HASH_NODE(pHashObj->freeFp, pNewNode); terrno = TSDB_CODE_DUP_KEY; code = terrno; goto _exit; } } + _exit: + taosHashEntryWUnlock(pHashObj, pe); taosHashRUnlock(pHashObj); return code; diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 6d275c9b851c..1695a9868230 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -24,6 +24,8 @@ static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; void* gMemPoolHandle = NULL; threadlocal void* threadPoolSession = NULL; +threadlocal bool threadPoolEnabled = true; + SMemPoolMgmt gMPMgmt = {0}; SMPStrategyFp gMPFps[] = { {NULL}, @@ -209,7 +211,7 @@ int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) { } int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { - MP_ERR_RET(mpCheckCfg(cfg)); +// MP_ERR_RET(mpCheckCfg(cfg)); TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg)); @@ -878,10 +880,14 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt mpLogDetailStat(&pPool->stat.statDetail, item, pInput); } if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { + taosDisableMemPoolUsage(); mpLogPosStat(&pSession->stat.posStat, item, pInput, true); + taosEnableMemPoolUsage(); } if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { + taosDisableMemPoolUsage(); mpLogPosStat(&pPool->stat.posStat, item, pInput, false); + taosEnableMemPoolUsage(); } break; }