Skip to content

Commit

Permalink
remove logs
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjiaming0909 committed Nov 8, 2024
1 parent a70b4e2 commit 2ddd071
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 101 deletions.
1 change: 0 additions & 1 deletion source/dnode/vnode/src/meta/metaTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,6 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi

metaWLock(pMeta);
rc = metaDropTableByUid(pMeta, uid, &type, &suid, &sysTbl);
metaInfo("wjm meta drop table by uid: %"PRId64, uid);
metaULock(pMeta);

if (rc < 0) goto _exit;
Expand Down
19 changes: 6 additions & 13 deletions source/dnode/vnode/src/tq/tqSink.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
int64_t earlyTs);
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName, int64_t uid);
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);

int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule) {
Expand Down Expand Up @@ -442,7 +442,7 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
SVDropTbBatchReq batchReq = {0};
SVDropTbReq req = {0};

if (rows <= 0 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;

batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
if (!batchReq.pArray) return terrno;
Expand All @@ -451,25 +451,22 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
req.igNotExists = true;

SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
SColumnInfoData* pUidCol = taosArrayGet(pDataBlock->pDataBlock, UID_COLUMN_INDEX);
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
for (int32_t i = 0; i < rows; ++i) {
void* pData = colDataGetVarData(pTbNameCol, i);
memcpy(tbName, varDataVal(pData), varDataLen(pData));
tbName[varDataLen(pData) + 1] = 0;
req.name = tbName;
// TODO wjm remove uid, it's not my uid
req.uid = *(int64_t*)colDataGetData(pUidCol, i);
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
TSDB_CHECK_CODE(terrno, lino, _exit);
}
}

SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
// TODO wjm handle only one table
// only one row
code = metaGetTableEntryByName(&mr, tbName);
if (isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
STableSinkInfo* pTableSinkInfo = NULL;
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
if (alreadyCached) {
Expand All @@ -485,9 +482,7 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
void* pData = colDataGetVarData(pTbNameCol, i);
memcpy(tbName, varDataVal(pData), varDataLen(pData));
tbName[varDataLen(pData) + 1] = 0;
int64_t uid = *(int64_t*)colDataGetData(pUidCol, i);
// TODO wjm remove uid it's not my uid
code = doWaitForDstTableDropped(pVnode, pTask, tbName, uid);
code = doWaitForDstTableDropped(pVnode, pTask, tbName);
TSDB_CHECK_CODE(code, lino, _exit);
}

Expand Down Expand Up @@ -906,7 +901,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_SUCCESS;
}

static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName, int64_t uid) {
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
Expand All @@ -920,7 +915,6 @@ static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, cons
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
tqDebug("wjm s-task:%s table:%s has been dropped", id, dstTableName);
metaReaderClear(&mr);
break;
} else if (TSDB_CODE_SUCCESS == code) {
Expand All @@ -929,7 +923,6 @@ static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, cons
taosMsleep(100);
tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
} else {
tqDebug("wjm s-task:%s table:%s exist, but not mine", id, dstTableName);
metaReaderClear(&mr);
break;
}
Expand Down
72 changes: 0 additions & 72 deletions source/dnode/vnode/src/vnd/vnodeSvr.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,75 +483,6 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return code;
}

static int32_t vnodePreProcessDropTSmaCtbMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SVDropTbBatchReq dropReq = {0};
int32_t code = 0;
int32_t lino = 0;
SDecoder dc = {0};
SEncoder ec = {0};
int32_t nTbs = 0;
SDeleteRes res = {0};
int32_t size = 0;
uint8_t *pCont = NULL;
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
if (tDecodeSVDropTbBatchReq(&dc, &dropReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
nTbs = dropReq.nReqs;
res.skey = INT64_MIN;
res.ekey = INT64_MAX;
res.affectedRows = 1;
res.uidList = taosArrayInit(nTbs, sizeof(tb_uid_t));
if (!res.uidList) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}

vDebug("vnode preprocess drop tsma ctb, vgId:%d tb num: %d", TD_VID(pVnode), nTbs);
for (int32_t i = 0; i < nTbs; ++i) {
SVDeleteRsp rsp = {.affectedRows = 1};
tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, dropReq.pReqs[i].name);
if (uid == 0) {
vWarn("vgId:%d, drop tsma ctb:%s not found", TD_VID(pVnode), dropReq.pReqs[i].name);
continue;
}
if (NULL == taosArrayPush(res.uidList, &uid)) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
}

tEncodeSize(tEncodeDeleteRes, &res, size, code);
pCont = rpcMallocCont(size + sizeof(SMsgHead));
if (!pCont) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);

tEncoderInit(&ec, pCont + sizeof(SMsgHead), size);
code = tEncodeDeleteRes(&ec, &res);
tEncoderClear(&ec);
if (code != 0) {
vError("vgId:%d %s failed to encode delete response", TD_VID(pVnode), __func__);
TSDB_CHECK_CODE(code, lino, _exit);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = pCont;
pCont = NULL;
pMsg->contLen = size + sizeof(SMsgHead);

_exit:
if (res.uidList) {
taosArrayDestroy(res.uidList);
}
tDecoderClear(&dc);
rpcFreeCont(pCont);
return code;
}

int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
Expand Down Expand Up @@ -1294,11 +1225,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
continue;
}

vInfo("wjm process create tb req:%s, uid: %"PRId64, pCreateReq->name, pCreateReq->uid);
// do create table
if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
vInfo("wjm already exists-----------------");
cRsp.code = TSDB_CODE_SUCCESS;
} else {
cRsp.code = terrno;
Expand Down Expand Up @@ -1375,7 +1304,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
}

_exit:
vInfo("wjm process create table request exit");
tDeleteSVCreateTbBatchReq(&req);
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
taosArrayDestroy(tbUids);
Expand Down
7 changes: 0 additions & 7 deletions source/libs/executor/src/groupoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,6 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
qInfo("wjm group id: %"PRId64 " winCode: %d, block type: %d", groupId, winCode, pSrcBlock->info.type);
if (winCode != TSDB_CODE_SUCCESS) {
SSDataBlock* pTmpBlock = NULL;
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
Expand All @@ -1335,28 +1334,22 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId;
char* tbName = pSrcBlock->info.parTbName;
printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm");
printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm");
if (pTableSup->numOfExprs > 0) {
code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs,
NULL);
QUERY_CHECK_CODE(code, lino, _end);

SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm");
printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm");
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = 0;
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
qInfo("wjm calculated tbnameis null");
len = 1;
tbName[0] = 0;
} else {
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len);
qInfo("wjm calculated tbname: %s", tbName);
code = pAPI->streamStatePutParName(pState, groupId, tbName);
QUERY_CHECK_CODE(code, lino, _end);
}
Expand Down
3 changes: 1 addition & 2 deletions source/libs/executor/src/scanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -3542,11 +3542,10 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32
int64_t* gpIdCol = (int64_t*)pGpIdCol->pData;
void* pParName = NULL;
int32_t winCode = 0;
// TODO wjm test remove non stream child tables
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i],
&pParName, false, &winCode);
if (TSDB_CODE_SUCCESS == code && winCode != 0) {
qInfo("delete stream part Name for:%"PRId64 " not found", gpIdCol[i]);
qDebug("delete stream part Name for:%"PRId64 " not found", gpIdCol[i]);
colDataSetNULL(pTbnameCol, i);
continue;
}
Expand Down
1 change: 0 additions & 1 deletion source/libs/stream/src/streamDispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
stInfo("wjm ctbname for dispatch: %s, pDataBlock.info.parTbName: %s", ctbName, pDataBlock->info.parTbName);
SBlockName bln = {0};
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
Expand Down
3 changes: 0 additions & 3 deletions source/libs/stream/src/streamState.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
qInfo("wjm put group id into parnamemap: %"PRId64 " cur mapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap));
QUERY_CHECK_CODE(code, lino, _end);
}
code = streamStatePutParName_rocksdb(pState, groupId, tbname);
Expand All @@ -506,7 +505,6 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
qInfo("wjm put group id into parnamemap: %"PRId64 " cur mapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap));
QUERY_CHECK_CODE(code, lino, _end);
}
goto _end;
Expand All @@ -528,7 +526,6 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
}

int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
qTrace("wjm delete par for group:%"PRId64 " parnameMapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap));
int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
qTrace("catche %s at line %d res %d", __func__, __LINE__, code);
code = streamStateDeleteParName_rocksdb(pState, groupId);
Expand Down
6 changes: 4 additions & 2 deletions tests/system-test/2-query/tsma.py
Original file line number Diff line number Diff line change
Expand Up @@ -1237,10 +1237,11 @@ def run(self):
clust_dnode_nums = len(cluster_dnode_list)
if clust_dnode_nums > 1:
self.test_redistribute_vgroups()
self.test_td_32519()
tdSql.execute("drop tsma test.tsma5")
for _ in range(4):
self.test_td_32519()

def test_td_32519(self):
tdSql.execute("drop tsma test.tsma5")
self.create_recursive_tsma('tsma1', 'tsma_r', 'test', '1h', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'])
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1)
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1)
Expand Down Expand Up @@ -1269,6 +1270,7 @@ def test_td_32519(self):
sql = 'select * from test.`163b7c69922cf6d83a98bfa44e52dade`'
self.wait_query(sql, 2, 20) ## tsma_r output ctb for t1
tdSql.checkData(0, 1, 1)
self.drop_tsma('tsma_r', 'test')

def test_create_tsma(self):
function_name = sys._getframe().f_code.co_name
Expand Down

0 comments on commit 2ddd071

Please sign in to comment.