Skip to content

Commit

Permalink
fix: memory free not found issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dapan1121 committed Nov 8, 2024
1 parent dd2ab5b commit e4c87a0
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 45 deletions.
27 changes: 16 additions & 11 deletions include/os/osMemPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,25 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp);
#ifndef BUILD_TEST
extern void* gMemPoolHandle;
extern threadlocal void* threadPoolSession;
extern threadlocal bool threadPoolEnabled;


#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableMemoryPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0)
#define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0)

#define taosMemoryMalloc(_size) ((NULL != gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((NULL != gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#define taosEnableMemPoolUsage() do { threadPoolEnabled = true; tsEnableRandErr = true;} while (0)
#define taosDisableMemPoolUsage() do { threadPoolEnabled = false; tsEnableRandErr = false;} while (0)


#define taosMemoryMalloc(_size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#else
#define taosEnableMemoryPoolUsage(_pool, _session)
#define taosDisableMemoryPoolUsage()
Expand Down
3 changes: 2 additions & 1 deletion source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}};
{"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize},
{"minReservedMemorySize", &tsMinReservedMemorySize}};

if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mgmt/exe/dmMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ int mainWindows(int argc, char **argv) {
}

if ((code = taosMemoryPoolInit(qWorkerRetireJobs, qWorkerRetireJob)) != 0) {
dError("failed to init conv");
dError("failed to init memPool, error:0x%x", code);
taosCloseLog();
taosCleanupArgs();
return code;
Expand Down
7 changes: 5 additions & 2 deletions source/dnode/mnode/impl/src/mndStb.c
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,12 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));

tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
terrno = tEncodeSVDropStbReq(&encoder, &req);
int32_t code = tEncodeSVDropStbReq(&encoder, &req);
tEncoderClear(&encoder);
if (terrno != 0) return NULL;
if (code != 0) {
terrno = code;
return NULL;
}

*pContLen = contLen;
return pHead;
Expand Down
4 changes: 2 additions & 2 deletions source/libs/qworker/inc/qwInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ extern SQueryMgmt gQueryMgmt;
#define QW_SINK_ENABLE_MEMPOOL(_ctx) \
do { \
if ((_ctx)->sinkWithMemPool) { \
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, (_ctx)->memPoolSession); \
taosEnableFullMemPoolUsage((_ctx)->memPoolSession); \
} \
} while (0)

#define QW_SINK_DISABLE_MEMPOOL() taosDisableMemoryPoolUsage()
#define QW_SINK_DISABLE_MEMPOOL() taosDisableFullMemPoolUsage()

#define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n)
Expand Down
4 changes: 2 additions & 2 deletions source/libs/qworker/src/qwUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
taosEnableFullMemPoolUsage(ctx->memPoolSession);
qDestroyTask(otaskHandle);
taosDisableMemoryPoolUsage();
taosDisableFullMemPoolUsage();

qDebug("task handle destroyed");
}
Expand Down
12 changes: 6 additions & 6 deletions source/libs/qworker/src/qworker.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (taskHandle) {
qwDbgSimulateSleep();

taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
taosEnableFullMemPoolUsage(ctx->memPoolSession);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
taosDisableMemoryPoolUsage();
taosDisableFullMemPoolUsage();

if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
Expand Down Expand Up @@ -803,19 +803,19 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false;

taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
taosEnableFullMemPoolUsage(ctx->memPoolSession);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
taosDisableMemoryPoolUsage();
taosDisableFullMemPoolUsage();

if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}

taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
taosEnableFullMemPoolUsage(ctx->memPoolSession);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
taosDisableMemoryPoolUsage();
taosDisableFullMemPoolUsage();

if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
Expand Down
2 changes: 1 addition & 1 deletion source/libs/sync/src/syncRaftStore.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {

_OVER:
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (buffer != NULL) taosMemFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);

if (code != 0) {
Expand Down
4 changes: 2 additions & 2 deletions source/libs/wal/src/walMeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -1085,13 +1085,13 @@ int32_t walSaveMeta(SWal* pWal) {
}
}

taosMemoryFree(serialized);
taosMemFree(serialized);
return code;

_err:
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
(void)taosCloseFile(&pMetaFile);
taosMemoryFree(serialized);
taosMemFree(serialized);
return code;
}

Expand Down
7 changes: 6 additions & 1 deletion source/os/src/osMemory.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,12 @@ int32_t taosMemTrim(int32_t size, bool* trimed) {
// do nothing
return TSDB_CODE_SUCCESS;
#else
*trimed = malloc_trim(size);
if (trimed) {
*trimed = malloc_trim(size);
} else {
malloc_trim(size);
}

return TSDB_CODE_SUCCESS;
#endif
}
Expand Down
25 changes: 10 additions & 15 deletions source/util/src/thash.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
return terrno = TSDB_CODE_INVALID_PTR;
}

int32_t code = TSDB_CODE_SUCCESS;
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);

// need the resize process, write lock applied
Expand All @@ -327,10 +328,15 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
taosHashWUnlock(pHashObj);
}

SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
code = terrno;
return code;
}

// disable resize
taosHashRLock(pHashObj);

int32_t code = TSDB_CODE_SUCCESS;
uint32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];

Expand All @@ -350,33 +356,22 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo

if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _exit;
}

pushfrontNodeInEntryList(pe, pNewNode);
(void)atomic_add_fetch_64(&pHashObj->size, 1);
} else {
// not support the update operation, return error
if (pHashObj->enableUpdate) {
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _exit;
}

doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else {
FREE_HASH_NODE(pHashObj->freeFp, pNewNode);
terrno = TSDB_CODE_DUP_KEY;
code = terrno;
goto _exit;
}
}

_exit:

taosHashEntryWUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj);
return code;
Expand Down
8 changes: 7 additions & 1 deletion source/util/src/tmempool.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
void* gMemPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL;
threadlocal bool threadPoolEnabled = true;

SMemPoolMgmt gMPMgmt = {0};
SMPStrategyFp gMPFps[] = {
{NULL},
Expand Down Expand Up @@ -209,7 +211,7 @@ int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) {
}

int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(mpCheckCfg(cfg));
// MP_ERR_RET(mpCheckCfg(cfg));

TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg));

Expand Down Expand Up @@ -878,10 +880,14 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
}
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
taosDisableMemPoolUsage();
mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
taosEnableMemPoolUsage();
}
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
taosDisableMemPoolUsage();
mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
taosEnableMemPoolUsage();
}
break;
}
Expand Down

0 comments on commit e4c87a0

Please sign in to comment.