diff --git a/das2/builder.c b/das2/builder.c index 7534a07..16835bd 100644 --- a/das2/builder.c +++ b/das2/builder.c @@ -133,10 +133,11 @@ int DasDim_copyInProps(DasDim* pThis, char cAxis, const DasDesc* pOther) return nCopied; } - /* ************************************************************************** */ /* Helpers */ +/* NOTO: pPd can be NULL if we're getting an already formed dataset! */ + size_t _DasDsBldr_addPair(DasDsBldr* pThis, PktDesc* pPd, DasDs* pCd) { struct ds_pd_set* pNewSet = NULL; @@ -150,12 +151,19 @@ size_t _DasDsBldr_addPair(DasDsBldr* pThis, PktDesc* pPd, DasDs* pCd) pThis->lPairs = pNewSet; } - /* Save a copy of the packet descriptor, including it's planes. This is - * used to see if this packet descriptor has been seen before. */ - PktDesc* pNewPd = new_PktDesc(); - PktDesc_copyPlanes(pNewPd, pPd); + /* If we're matching up packet descriptors to datasets, save a copy of the + packet descriptor, including it's planes. This is used to see if this + packet descriptor has been seen before. + */ - pThis->lPairs[pThis->uValidPairs].pPd = pNewPd; + if(pPd != NULL){ + PktDesc* pNewPd = new_PktDesc(); + PktDesc_copyPlanes(pNewPd, pPd); + pThis->lPairs[pThis->uValidPairs].pPd = pNewPd; + } + else{ + pThis->lPairs[pThis->uValidPairs].pPd = NULL; + } pThis->lPairs[pThis->uValidPairs].pDs = pCd; pThis->uValidPairs += 1; return (pThis->uValidPairs - 1); @@ -193,6 +201,8 @@ int _DasDsBldr_hasContainer(DasDsBldr* pThis, PktDesc* pPd) for(u = 0; u < pThis->uValidPairs; ++u){ pPdTest = pThis->lPairs[u].pPd; + if(pPdTest == NULL) /* Happens when stream contains datasets */ + continue; /* Check number of planes */ if(pPd->uPlanes != pPdTest->uPlanes) continue; @@ -272,6 +282,7 @@ char* _DasDsBldr_getExistingGroup( for(size_t u = 0; u < pThis->uValidPairs; ++u){ pPdTest = pThis->lPairs[u].pPd; + if(pPdTest == NULL) continue; /* Happens when dataset descriptors in the stream */ /* Check number of planes */ if(pPd->uPlanes != pPdTest->uPlanes) continue; @@ -1040,6 +1051,23 @@ DasErrCode DasDsBldr_onPktDesc(StreamDesc* pSd, PktDesc* pPd, void* vpUd) /* ************************************************************************* */ +DasErrCode DasDsBldr_onDataSet(StreamDesc* pSd, int iPktId, DasDs* pDs, void* vpUd) +{ + DasDsBldr* pThis = (DasDsBldr*)vpUd; + + /* Not much to do here, we already have a valid dataset, just add it to + the list, but don't associate a packet descriptor */ + if( pThis->lDsMap[iPktId] != -1) + return das_error(DASERR_BLDR, "Packet reuse not supported for DasDs descriptors"); + + size_t uIdx = _DasDsBldr_addPair(pThis, NULL, pDs); + pThis->lDsMap[iPktId] = uIdx; + + return DAS_OKAY; +} + +/* ************************************************************************* */ + DasErrCode DasDsBldr_onPktData(PktDesc* pPd, void* vpUd) { DasDsBldr* pThis = (DasDsBldr*)vpUd; @@ -1065,6 +1093,17 @@ DasErrCode DasDsBldr_onPktData(PktDesc* pPd, void* vpUd) return DAS_OKAY; } +/* ************************************************************************* */ + +DasErrCode DasDsBldr_onDsData(StreamDesc* pSd, int iPktId, DasDs* pDs, void* vpUd) +{ + /* DasIO automatically calls dasds_decode_data which calls + DasCodec_decode which appends data, so nothing to do here */ + + return DAS_OKAY; +} + + /* ************************************************************************* */ DasErrCode DasDsBldr_onComment(OobComment* pSc, void* vpUd) @@ -1091,6 +1130,18 @@ DasErrCode DasDsBldr_onClose(StreamDesc* pSd, void* vpUd) for(int i = 0; i < pThis->uValidPairs; ++i) DasDs_setMutable(pThis->lPairs[i].pDs, false); + + /* very important, do not let the stream descriptor delete our datasets + take ownership of them. */ + int nPktId = 0; + DasDesc* pDesc = NULL; + while((pDesc = DasStream_nextPktDesc(pSd, &nPktId)) != NULL){ + if(DasDesc_type(pDesc) == DATASET){ + DasErrCode nRet = DasStream_rmPktDesc(pSd, pDesc, 0); + if(nRet != DAS_OKAY) + return nRet; + } + } return DAS_OKAY; } @@ -1107,11 +1158,13 @@ DasDsBldr* new_DasDsBldr(void) pThis->base.userData = pThis; pThis->base.streamDescHandler = DasDsBldr_onStreamDesc; - pThis->base.pktDescHandler = DasDsBldr_onPktDesc; - pThis->base.pktDataHandler = DasDsBldr_onPktData; - pThis->base.exceptionHandler = DasDsBldr_onException; - pThis->base.closeHandler = DasDsBldr_onClose; - pThis->base.commentHandler = DasDsBldr_onComment; + pThis->base.pktDescHandler = DasDsBldr_onPktDesc; + pThis->base.dsDescHandler = DasDsBldr_onDataSet; + pThis->base.pktDataHandler = DasDsBldr_onPktData; + pThis->base.dsDataHandler = DasDsBldr_onDsData; + pThis->base.exceptionHandler = DasDsBldr_onException; + pThis->base.closeHandler = DasDsBldr_onClose; + pThis->base.commentHandler = DasDsBldr_onComment; pThis->_released = false; for(int i = 0; ilDsMap[i] = -1; @@ -1157,8 +1210,10 @@ void del_DasDsBldr(DasDsBldr* pThis){ free(pThis->pProps); } - for(size_t u = 0; u < pThis->uValidPairs; ++u) - del_PktDesc(pThis->lPairs[u].pPd); + for(size_t u = 0; u < pThis->uValidPairs; ++u){ + if(pThis->lPairs[u].pPd != NULL) + del_PktDesc(pThis->lPairs[u].pPd); + } free(pThis->lPairs); free(pThis); diff --git a/das2/io.c b/das2/io.c index c4787c5..97df528 100644 --- a/das2/io.c +++ b/das2/io.c @@ -1203,7 +1203,9 @@ DasErrCode _DasIO_handleDesc( break; case DATASET: if(pHndlr->dsDescHandler != NULL) - nRet = pHndlr->dsDescHandler(pSd, (DasDs*)pSd->descriptors[nPktId], pHndlr->userData); + nRet = pHndlr->dsDescHandler( + pSd, nPktId, (DasDs*)pSd->descriptors[nPktId], pHndlr->userData + ); break; default: nRet = das_error(DASERR_IO, "Unexpected descriptor type %d", pDesc->type); @@ -1243,7 +1245,7 @@ DasErrCode _DasIO_handleData( if(pHndlr->dsDataHandler == NULL) bClearDs = true; else - nRet = pHndlr->dsDataHandler(pSd, (DasDs*)pDesc, pHndlr->userData); + nRet = pHndlr->dsDataHandler(pSd, nPktId, (DasDs*)pDesc, pHndlr->userData); } if(nRet != DAS_OKAY) break; diff --git a/das2/processor.h b/das2/processor.h index d99f4cd..cb5a550 100644 --- a/das2/processor.h +++ b/das2/processor.h @@ -71,23 +71,25 @@ typedef DasErrCode (*PktDataHandler)(PktDesc* pd, void* ud); * on the input stream. * * @param sd A pointer to the parsed Stream Descriptor + * @param pi The packet ID associated with this dataset * @param dd A poirter to a parsed DasDs (dataset) definition * @param ud A pointer to a user data structure, may be NULL * * @param */ -typedef DasErrCode (*DsDescHandler)(StreamDesc* sd, DasDs* dd, void* ud); +typedef DasErrCode (*DsDescHandler)(StreamDesc* sd, int pi, DasDs* dd, void* ud); /** Callback function invoked when a new data packets for a dataset are * encountered on the stream. * * @param sd A pointer to the parsed Stream Descriptor + * @param pi The packet ID associated with this dataset * @param dd A poirter to a parsed DasDs (dataset) definition * @param pi A pointer to the max index of the dataset before the * new data were added * @param ud A pointer to a user data structure, may be NULL */ -typedef DasErrCode (*DsDataHandler)(StreamDesc* sd, DasDs* dd, void* ud); +typedef DasErrCode (*DsDataHandler)(StreamDesc* sd, int pi, DasDs* dd, void* ud); /** Callback functions that are invoked on Stream Close * callback function that is called at the end of the stream diff --git a/das2/stream.c b/das2/stream.c index 8cfd050..5486bea 100644 --- a/das2/stream.c +++ b/das2/stream.c @@ -44,11 +44,11 @@ /* ************************************************************************** */ /* Construction */ -StreamDesc* new_StreamDesc() +DasStream* new_DasStream() { - StreamDesc* pThis; + DasStream* pThis; - pThis = (StreamDesc*) calloc(1, sizeof( StreamDesc ) ); + pThis = (DasStream*) calloc(1, sizeof( DasStream ) ); DasDesc_init((DasDesc*)pThis, STREAM); pThis->bDescriptorSent = false; @@ -60,11 +60,11 @@ StreamDesc* new_StreamDesc() } /* Deep copy a stream descriptor */ -StreamDesc* DasStream_copy(const StreamDesc* pThis) +DasStream* DasStream_copy(const DasStream* pThis) { /* Since this is a deep copy do it by explicit assignment, less likely to * make a mistake that way */ - StreamDesc* pOut = new_StreamDesc(); + DasStream* pOut = new_DasStream(); strncpy(pOut->compression, pThis->compression, 47); pOut->pUser = pThis->pUser; /* Should this be copied ? */ @@ -73,7 +73,7 @@ StreamDesc* DasStream_copy(const StreamDesc* pThis) return pOut; } -void del_StreamDesc(StreamDesc* pThis){ +void del_DasStream(DasStream* pThis){ DasDesc_freeProps(&(pThis->base)); for(size_t u = 1; u < MAX_PKTIDS; u++){ DasDesc* pDesc = pThis->descriptors[u]; @@ -91,7 +91,7 @@ void del_StreamDesc(StreamDesc* pThis){ } /* ************************************************************************** */ -char* DasStream_info(const StreamDesc* pThis, char* sBuf, int nLen) +char* DasStream_info(const DasStream* pThis, char* sBuf, int nLen) { if(nLen < 30) return sBuf; @@ -130,7 +130,7 @@ char* DasStream_info(const StreamDesc* pThis, char* sBuf, int nLen) /* ************************************************************************** */ /* Adding/Removing detail */ -void DasStream_addStdProps(StreamDesc* pThis) +void DasStream_addStdProps(DasStream* pThis) { time_t tCur; struct tm* pCur = NULL; @@ -154,7 +154,7 @@ void DasStream_addStdProps(StreamDesc* pThis) */ } -void DasStream_setMonotonic(StreamDesc* pThis, bool isMonotonic ) +void DasStream_setMonotonic(DasStream* pThis, bool isMonotonic ) { if(isMonotonic) DasDesc_setBool( (DasDesc*)pThis, "monotonicXTags", true ); @@ -162,14 +162,14 @@ void DasStream_setMonotonic(StreamDesc* pThis, bool isMonotonic ) DasDesc_setBool( (DasDesc*)pThis, "monotonicXTags", false ); } -size_t DasStream_getNPktDesc(const StreamDesc* pThis) +size_t DasStream_getNPktDesc(const DasStream* pThis) { size_t nRet = 0; for(size_t u = 1; u < MAX_PKTIDS; u++) if(pThis->descriptors[u]) nRet++; return nRet; } -int DasStream_nextPktId(StreamDesc* pThis) +int DasStream_nextPktId(DasStream* pThis) { for (int i = 1; i < MAX_PKTIDS; ++i){ /* 00 is reserved for stream descriptor */ if(pThis->descriptors[i] == NULL) @@ -179,7 +179,7 @@ int DasStream_nextPktId(StreamDesc* pThis) } PktDesc* DasStream_createPktDesc( - StreamDesc* pThis, DasEncoding* pXEncoder, das_units xUnits + DasStream* pThis, DasEncoding* pXEncoder, das_units xUnits ){ PktDesc* pPkt; @@ -194,7 +194,7 @@ PktDesc* DasStream_createPktDesc( return pPkt; } -DasErrCode DasStream_freeSubDesc(StreamDesc* pThis, int nPktId) +DasErrCode DasStream_freeSubDesc(DasStream* pThis, int nPktId) { if(!DasStream_isValidId(pThis, nPktId)) return das_error(DASERR_STREAM, "%s: stream contains no descriptor for packets " @@ -213,7 +213,7 @@ DasErrCode DasStream_freeSubDesc(StreamDesc* pThis, int nPktId) return DAS_OKAY; } -PktDesc* DasStream_getPktDesc(const StreamDesc* pThis, int nPacketId) +PktDesc* DasStream_getPktDesc(const DasStream* pThis, int nPacketId) { if(nPacketId < 1 || nPacketId > 99){ das_error(DASERR_STREAM, @@ -225,7 +225,7 @@ PktDesc* DasStream_getPktDesc(const StreamDesc* pThis, int nPacketId) return (pDesc == NULL)||(pDesc->type != PACKET) ? NULL : (PktDesc*)pDesc; } -DasDesc* DasStream_nextPktDesc(const StreamDesc* pThis, int* pPrevPktId) +DasDesc* DasStream_nextPktDesc(const DasStream* pThis, int* pPrevPktId) { int nBeg = *pPrevPktId + 1; if(nBeg < 1){ @@ -242,7 +242,7 @@ DasDesc* DasStream_nextPktDesc(const StreamDesc* pThis, int* pPrevPktId) } -void DasStream_addCmdLineProp(StreamDesc* pThis, int argc, char * argv[] ) +void DasStream_addCmdLineProp(DasStream* pThis, int argc, char * argv[] ) { /* Save up to 1023 bytes of command line info */ char sCmd[1024] = {'\0'}; @@ -270,7 +270,7 @@ void DasStream_addCmdLineProp(StreamDesc* pThis, int argc, char * argv[] ) /* ************************************************************************* */ /* Copying stream objects */ -PktDesc* DasStream_clonePktDesc(StreamDesc* pThis, const PktDesc* pPdIn) +PktDesc* DasStream_clonePktDesc(DasStream* pThis, const PktDesc* pPdIn) { PktDesc* pPdOut; @@ -289,7 +289,7 @@ PktDesc* DasStream_clonePktDesc(StreamDesc* pThis, const PktDesc* pPdIn) return pPdOut; } -bool DasStream_isValidId(const StreamDesc* pThis, int nPktId) +bool DasStream_isValidId(const DasStream* pThis, int nPktId) { if(nPktId > 0 && nPktId < MAX_PKTIDS){ if(pThis->descriptors[nPktId] != NULL) return true; @@ -298,7 +298,7 @@ bool DasStream_isValidId(const StreamDesc* pThis, int nPktId) } PktDesc* DasStream_clonePktDescById( - StreamDesc* pThis, const StreamDesc* pOther, int nPacketId + DasStream* pThis, const DasStream* pOther, int nPacketId ){ PktDesc* pIn, *pOut; @@ -325,18 +325,20 @@ PktDesc* DasStream_clonePktDescById( return pOut; } -DasErrCode DasStream_addPktDesc(StreamDesc* pThis, DasDesc* pDesc, int nPktId) +DasErrCode DasStream_addPktDesc(DasStream* pThis, DasDesc* pDesc, int nPktId) { /* Only accept either das2 packet descriptors or das3 datasets */ if((pDesc->type != PACKET)&&(pDesc->type != DATASET)) return das_error(DASERR_STREAM, "Unexpected packet desciptor type"); - if((pDesc->parent != NULL)&&(pDesc->parent != (DasDesc*)pThis)) + if((pDesc->parent != NULL)&&(pDesc->parent != (DasDesc*)pThis)){ + /* Hint to random developer: If you are here because you wanted to copy * another stream's packet descriptor onto this stream use one of * DasStream_clonePktDesc() or DasStream_clonePktDescById() instead. */ return das_error(DASERR_STREAM, "Packet Descriptor already belongs to different " "stream"); + } /* Check uniqueness */ if(pDesc->parent == (DasDesc*)pThis){ @@ -350,10 +352,10 @@ DasErrCode DasStream_addPktDesc(StreamDesc* pThis, DasDesc* pDesc, int nPktId) } if(nPktId < 1 || nPktId > 99) - return das_error(DASERR_STREAM, "Illegal packet id in addPktDesc: %02d", nPktId); + return das_error(DASERR_STREAM, "Illegal packet id: %02d", nPktId); if(pThis->descriptors[nPktId] != NULL) - return das_error(DASERR_STREAM, "StreamDesc already has a packet descriptor with ID" + return das_error(DASERR_STREAM, "DasStream already has a packet descriptor with ID" " %02d", nPktId); pThis->descriptors[nPktId] = pDesc; @@ -366,11 +368,48 @@ DasErrCode DasStream_addPktDesc(StreamDesc* pThis, DasDesc* pDesc, int nPktId) return 0; } +DasErrCode DasStream_rmPktDesc(DasStream* pThis, DasDesc* pDesc, int nPktId) +{ + /* This is essentiall 2 functions, but we don't have function overloading in C */ + + if(pDesc != NULL){ + + if((pDesc->type != PACKET)&&(pDesc->type != DATASET)) + return das_error(DASERR_STREAM, "Unexpected packet desciptor type"); + + if((pDesc->parent != (DasDesc*)pThis)) + return das_error(DASERR_STREAM, "Descriptor dosen't belong to this stream"); + + for(int i = 0; i < MAX_PKTIDS; ++i){ + if(pThis->descriptors[i] == pDesc){ + pThis->descriptors[i] = NULL; /* Detach both ways */ + pDesc->parent = NULL; + return DAS_OKAY; + } + } + + return das_error(DASERR_STREAM, "Descriptor is not part of this stream"); + } + + if(nPktId < 1 || nPktId > 99) + return das_error(DASERR_STREAM, "Illegal packet id: %02d", nPktId); + + if(pThis->descriptors[nPktId] == NULL) + return das_error(DASERR_STREAM, "Stream has not descriptor for packet id: %02d", nPktId); + + pDesc = pThis->descriptors[nPktId]; + pDesc->parent = NULL; + pThis->descriptors[nPktId] = NULL; + + return DAS_OKAY; +} + + /* ************************************************************************* */ /* Frame wrappers */ DasFrame* DasStream_createFrame( - StreamDesc* pThis, ubyte id, const char* sName, const char* sType + DasStream* pThis, ubyte id, const char* sName, const char* sType ){ // Find a slot for it. size_t uIdx = 0; @@ -401,7 +440,7 @@ DasFrame* DasStream_createFrame( return pFrame; } -int8_t DasStream_getFrameId(const StreamDesc* pThis, const char* sFrame){ +int8_t DasStream_getFrameId(const DasStream* pThis, const char* sFrame){ for(int i = 0; i < MAX_FRAMES; ++i){ if(pThis->frames[i] == NULL) @@ -412,7 +451,7 @@ int8_t DasStream_getFrameId(const StreamDesc* pThis, const char* sFrame){ return -1 * DASERR_STREAM; } -int DasStream_newFrameId(const StreamDesc* pThis){ +int DasStream_newFrameId(const DasStream* pThis){ /* Since MAX_FRAMES is small and the size of the frame ID field is small, we can get away with a double loop */ @@ -430,7 +469,7 @@ int DasStream_newFrameId(const StreamDesc* pThis){ return -1 * DASERR_STREAM; } -int8_t DasStream_getNumFrames(const StreamDesc* pThis) +int8_t DasStream_getNumFrames(const DasStream* pThis) { /* Return the ID of the last defined frame */ int8_t iLastGood = -1; @@ -441,13 +480,13 @@ int8_t DasStream_getNumFrames(const StreamDesc* pThis) return iLastGood + 1; } -const DasFrame* DasStream_getFrame(const StreamDesc* pThis, int idx) +const DasFrame* DasStream_getFrame(const DasStream* pThis, int idx) { return (idx < 0 || idx > MAX_FRAMES) ? NULL : pThis->frames[idx]; } const DasFrame* DasStream_getFrameByName( - const StreamDesc* pThis, const char* sFrame + const DasStream* pThis, const char* sFrame ){ for(size_t u = 0; (u < MAX_FRAMES) && (pThis->frames[u] != NULL); ++u){ if(strcmp(sFrame, DasFrame_getName(pThis->frames[u])) == 0) @@ -456,7 +495,7 @@ const DasFrame* DasStream_getFrameByName( return NULL; } -const DasFrame* DasStream_getFrameById(const StreamDesc* pThis, ubyte id) +const DasFrame* DasStream_getFrameById(const DasStream* pThis, ubyte id) { for(size_t u = 0; (u < MAX_FRAMES) && (pThis->frames[u] != NULL); ++u){ if(pThis->frames[u]->id == id) @@ -474,7 +513,7 @@ const DasFrame* DasStream_getFrameById(const StreamDesc* pThis, ubyte id) #define _TYPE_BUF_SZ 23 typedef struct parse_stream_desc{ - StreamDesc* pStream; + DasStream* pStream; DasFrame* pFrame; // Only non-null when in a tag DasErrCode nRet; bool bInProp; @@ -494,7 +533,7 @@ void parseDasStream_start(void* data, const char* el, const char** attr) if(pPsd->nRet != DAS_OKAY) /* Processing halt */ return; - StreamDesc* pSd = pPsd->pStream; + DasStream* pSd = pPsd->pStream; char sType[64] = {'\0'}; char sName[64] = {'\0'}; ubyte nFrameId = 0; @@ -661,7 +700,7 @@ void parseDasStream_chardata(void* data, const char* sChars, int len) DasAry_append(pAry, (ubyte*) sChars, len); } -/* Formerly nested function "end" in parseStreamDescriptor */ +/* Formerly nested function "end" in parseDasStreamriptor */ void parseDasStream_end(void* data, const char* el) { parse_stream_desc_t* pPsd = (parse_stream_desc_t*)data; @@ -707,11 +746,11 @@ void parseDasStream_end(void* data, const char* el) DasAry_clear(pAry); } -StreamDesc* new_DasStream_str(DasBuf* pBuf, int nModel) +DasStream* new_DasStream_str(DasBuf* pBuf, int nModel) { - StreamDesc* pThis = new_StreamDesc(); + DasStream* pThis = new_DasStream(); - /*StreamDesc* pDesc; + /*DasStream* pDesc; DasErrCode nRet; char sPropUnits[_UNIT_BUF_SZ+1]; char sPropName[_NAME_BUF_SZ+1]; @@ -746,12 +785,12 @@ StreamDesc* new_DasStream_str(DasBuf* pBuf, int nModel) das_error(DASERR_STREAM, "Parse error at line %d:\n%s\n", XML_GetCurrentLineNumber(p), XML_ErrorString(XML_GetErrorCode(p)) ); - del_StreamDesc(pThis); // Don't leak on fail + del_DasStream(pThis); // Don't leak on fail DasAry_deInit(&(psd.aPropVal)); return NULL; } if(psd.nRet != 0){ - del_StreamDesc(pThis); // Don't leak on fail + del_DasStream(pThis); // Don't leak on fail DasAry_deInit(&(psd.aPropVal)); return NULL; } @@ -759,7 +798,7 @@ StreamDesc* new_DasStream_str(DasBuf* pBuf, int nModel) return pThis; } -DasErrCode DasStream_encode(StreamDesc* pThis, DasBuf* pBuf) +DasErrCode DasStream_encode(DasStream* pThis, DasBuf* pBuf) { DasErrCode nRet = 0; if((nRet = DasBuf_printf(pBuf, "