Skip to content

Commit

Permalink
refine unused codes
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjiaming0909 committed Nov 8, 2024
1 parent 2ddd071 commit 274a7fd
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 210 deletions.
196 changes: 5 additions & 191 deletions source/dnode/mnode/impl/src/mndStb.c
Original file line number Diff line number Diff line change
Expand Up @@ -4086,42 +4086,16 @@ typedef struct SMDropTbTsmaInfos {
} SMDropTbTsmaInfos;

typedef struct SMndDropTbsWithTsmaCtx {
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>, only for non tsma result child table
SHashObj *pTsmaTbVgMap; // <vgid, SVDropTbVgReqs>, only for tsma result child table
SArray *pResTbNames; // SArray<char*>
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
} SMndDropTbsWithTsmaCtx;

static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
int32_t vgId);

static void destroySVDropTbBatchReqs(void *p);
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
if (!p) return;

if (p->pDbMap) {
void *pIter = taosHashIterate(p->pDbMap, NULL);
while (pIter) {
SMDropTbDbInfo *pInfo = pIter;
taosArrayDestroy(pInfo->dbVgInfos);
pIter = taosHashIterate(p->pDbMap, pIter);
}
taosHashCleanup(p->pDbMap);
}
if (p->pResTbNames) {
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
}
if (p->pTsmaMap) {
void *pIter = taosHashIterate(p->pTsmaMap, NULL);
while (pIter) {
SMDropTbTsmaInfos *pInfos = pIter;
taosArrayDestroy(pInfos->pTsmaInfos);
pIter = taosHashIterate(p->pTsmaMap, pIter);
}
taosHashCleanup(p->pTsmaMap);
}

if (p->pVgMap) {
void *pIter = taosHashIterate(p->pVgMap, NULL);
while (pIter) {
Expand All @@ -4131,47 +4105,20 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
}
taosHashCleanup(p->pVgMap);
}

if (p->pTsmaTbVgMap) {
void *pIter = taosHashIterate(p->pTsmaTbVgMap, NULL);
while (pIter) {
SVDropTbVgReqs *pReqs = pIter;
taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
pIter = taosHashIterate(p->pTsmaTbVgMap, pIter);
}
taosHashCleanup(p->pTsmaTbVgMap);
}
taosMemoryFree(p);
}

static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
int32_t code = 0;
SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
if (!pCtx) return terrno;
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (!pCtx->pTsmaMap) {
code = terrno;
goto _end;
}

pCtx->pDbMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (!pCtx->pDbMap) {
code = terrno;
goto _end;
}
pCtx->pResTbNames = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);

pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pCtx->pVgMap) {
code = terrno;
goto _end;
}

pCtx->pTsmaTbVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pCtx->pTsmaTbVgMap) {
code = terrno;
goto _end;
}
*ppCtx = pCtx;
_end:
if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
Expand Down Expand Up @@ -4286,7 +4233,7 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
if (code) goto _OVER;
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
code = mndDropTbForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
if (code) goto _OVER;
}
code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
Expand Down Expand Up @@ -4350,61 +4297,7 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
return 0;
}

int vgInfoCmp(const void *lp, const void *rp) {
SVgroupInfo *pLeft = (SVgroupInfo *)lp;
SVgroupInfo *pRight = (SVgroupInfo *)rp;
if (pLeft->hashBegin < pRight->hashBegin) {
return -1;
} else if (pLeft->hashBegin > pRight->hashBegin) {
return 1;
}

return 0;
}

static int32_t mndGetDbVgInfoForTsma(SMnode *pMnode, const char *dbname, SMDropTbTsmaInfo *pInfo) {
int32_t code = 0;
SDbObj *pDb = mndAcquireDb(pMnode, dbname);
if (!pDb) {
code = TSDB_CODE_MND_DB_NOT_EXIST;
goto _end;
}

pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (!pInfo->dbInfo.dbVgInfos) {
code = terrno;
goto _end;
}
mndBuildDBVgroupInfo(pDb, pMnode, pInfo->dbInfo.dbVgInfos);
taosArraySort(pInfo->dbInfo.dbVgInfos, vgInfoCmp);

pInfo->dbInfo.hashPrefix = pDb->cfg.hashPrefix;
pInfo->dbInfo.hashSuffix = pDb->cfg.hashSuffix;
pInfo->dbInfo.hashMethod = pDb->cfg.hashMethod;

_end:
if (pDb) mndReleaseDb(pMnode, pDb);
if (code && pInfo->dbInfo.dbVgInfos) {
taosArrayDestroy(pInfo->dbInfo.dbVgInfos);
pInfo->dbInfo.dbVgInfos = NULL;
}
TAOS_RETURN(code);
}

int32_t vgHashValCmp(const void *lp, const void *rp) {
uint32_t *key = (uint32_t *)lp;
SVgroupInfo *pVg = (SVgroupInfo *)rp;

if (*key < pVg->hashBegin) {
return -1;
} else if (*key > pVg->hashEnd) {
return 1;
}

return 0;
}

static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
int32_t vgId) {
int32_t code = 0;

Expand All @@ -4420,88 +4313,9 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWith
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);

// get all stb uids
for (int32_t i = 0; i < pTbs->size; ++i) {
const SVDropTbReq *pTb = taosArrayGet(pTbs, i);
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
} else {
SMDropTbTsmaInfos infos = {0};
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
if (!infos.pTsmaInfos) {
code = terrno;
goto _end;
}
if (taosHashPut(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid), &infos, sizeof(infos)) != 0) {
code = terrno;
goto _end;
}
}
}

void *pIter = NULL;
SSmaObj *pSma = NULL;
char buf[TSDB_TABLE_FNAME_LEN] = {0};
// get used tsmas and it's dbs
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
if (!pIter) break;
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
if (pInfos) {
SMDropTbTsmaInfo info = {0};
int32_t len = sprintf(buf, "%s", pSma->name);
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_FNAME_LEN, "%s", buf);
SMDropTbDbInfo *pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
info.suid = pSma->dstTbUid;
if (!pDbInfo) {
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
if (taosHashPut(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN, &info.dbInfo, sizeof(SMDropTbDbInfo)) != 0) {
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
} else {
info.dbInfo = *pDbInfo;
}
if (taosArrayPush(pInfos->pTsmaInfos, &info) == NULL) {
code = terrno;
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
}
sdbRelease(pMnode->pSdb, pSma);
}

// generate vg req map
for (int32_t i = 0; i < pTbs->size; ++i) {
SVDropTbReq *pTb = taosArrayGet(pTbs, i);
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);

SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
SArray *pVgInfos = NULL;
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char resTbFullName[TSDB_TABLE_FNAME_LEN + 1] = {0};
for (int32_t j = 0; j < pInfos->pTsmaInfos->size; ++j) {
SMDropTbTsmaInfo *pInfo = taosArrayGet(pInfos->pTsmaInfos, j);
int32_t len = sprintf(buf, "%s_%s", pInfo->tsmaResTbNamePrefix, pTb->name);
len = taosCreateMD5Hash(buf, len);
len = snprintf(resTbFullName, TSDB_TABLE_FNAME_LEN + 1, "%s.%s", pInfo->tsmaResTbDbFName, buf);
uint32_t hashVal = taosGetTbHashVal(resTbFullName, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix,
pInfo->dbInfo.hashSuffix);
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
void *p = taosStrdup(resTbFullName + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
if (taosArrayPush(pCtx->pResTbNames, &p) == NULL) {
code = terrno;
goto _end;
}
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pTsmaTbVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end);
}
}
_end:
return code;
Expand Down Expand Up @@ -4529,7 +4343,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
code = mndInitDropTbsWithTsmaCtx(&pCtx);
if (code) goto _end;

code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
code = mndDropTbForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
if (code) goto _end;
code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
Expand Down
29 changes: 12 additions & 17 deletions source/dnode/vnode/src/tq/tqSink.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,20 +451,19 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
req.igNotExists = true;

SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
for (int32_t i = 0; i < rows; ++i) {
void* pData = colDataGetVarData(pTbNameCol, i);
memcpy(tbName, varDataVal(pData), varDataLen(pData));
tbName[varDataLen(pData) + 1] = 0;
req.name = tbName;
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
TSDB_CHECK_CODE(terrno, lino, _exit);
}
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
int32_t i = 0;
void* pData = colDataGetVarData(pTbNameCol, i);
memcpy(tbName, varDataVal(pData), varDataLen(pData));
tbName[varDataLen(pData) + 1] = 0;
req.name = tbName;
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
TSDB_CHECK_CODE(terrno, lino, _exit);
}

SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
// only one row

code = metaGetTableEntryByName(&mr, tbName);
if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
STableSinkInfo* pTableSinkInfo = NULL;
Expand All @@ -478,13 +477,9 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
TSDB_CHECK_CODE(code, lino, _exit);

for (int32_t i = 0; i < rows; ++i) {
void* pData = colDataGetVarData(pTbNameCol, i);
memcpy(tbName, varDataVal(pData), varDataLen(pData));
tbName[varDataLen(pData) + 1] = 0;
code = doWaitForDstTableDropped(pVnode, pTask, tbName);
TSDB_CHECK_CODE(code, lino, _exit);
}

code = doWaitForDstTableDropped(pVnode, pTask, tbName);
TSDB_CHECK_CODE(code, lino, _exit);

_exit:
if (batchReq.pArray) {
Expand Down
8 changes: 6 additions & 2 deletions source/libs/stream/src/streamState.c
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,13 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal

int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
qTrace("catche %s at line %d res %d", __func__, __LINE__, code);
if (TSDB_CODE_SUCCESS != code) {
qWarn("failed to remove parname from cache, code:%d", code);
}
code = streamStateDeleteParName_rocksdb(pState, groupId);
qTrace("disk %s at line %d res %d", __func__, __LINE__, code);
if (TSDB_CODE_SUCCESS != code) {
qWarn("failed to remove parname from rocksdb, code:%d", code);
}
return TSDB_CODE_SUCCESS;
}

Expand Down

0 comments on commit 274a7fd

Please sign in to comment.