Skip to content

Commit

Permalink
Merge branch 'master' of github.com:das-developers/das2C
Browse files Browse the repository at this point in the history
  • Loading branch information
cpiker committed Mar 6, 2024
2 parents f9ed1d0 + 379ec0a commit a05202c
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 68 deletions.
81 changes: 68 additions & 13 deletions das2/builder.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ int DasDim_copyInProps(DasDim* pThis, char cAxis, const DasDesc* pOther)
return nCopied;
}


/* ************************************************************************** */
/* Helpers */

/* NOTO: pPd can be NULL if we're getting an already formed dataset! */

size_t _DasDsBldr_addPair(DasDsBldr* pThis, PktDesc* pPd, DasDs* pCd)
{
struct ds_pd_set* pNewSet = NULL;
Expand All @@ -150,12 +151,19 @@ size_t _DasDsBldr_addPair(DasDsBldr* pThis, PktDesc* pPd, DasDs* pCd)
pThis->lPairs = pNewSet;
}

/* Save a copy of the packet descriptor, including it's planes. This is
* used to see if this packet descriptor has been seen before. */
PktDesc* pNewPd = new_PktDesc();
PktDesc_copyPlanes(pNewPd, pPd);
/* If we're matching up packet descriptors to datasets, save a copy of the
packet descriptor, including it's planes. This is used to see if this
packet descriptor has been seen before.
*/

pThis->lPairs[pThis->uValidPairs].pPd = pNewPd;
if(pPd != NULL){
PktDesc* pNewPd = new_PktDesc();
PktDesc_copyPlanes(pNewPd, pPd);
pThis->lPairs[pThis->uValidPairs].pPd = pNewPd;
}
else{
pThis->lPairs[pThis->uValidPairs].pPd = NULL;
}
pThis->lPairs[pThis->uValidPairs].pDs = pCd;
pThis->uValidPairs += 1;
return (pThis->uValidPairs - 1);
Expand Down Expand Up @@ -193,6 +201,8 @@ int _DasDsBldr_hasContainer(DasDsBldr* pThis, PktDesc* pPd)

for(u = 0; u < pThis->uValidPairs; ++u){
pPdTest = pThis->lPairs[u].pPd;
if(pPdTest == NULL) /* Happens when stream contains datasets */
continue;

/* Check number of planes */
if(pPd->uPlanes != pPdTest->uPlanes) continue;
Expand Down Expand Up @@ -272,6 +282,7 @@ char* _DasDsBldr_getExistingGroup(

for(size_t u = 0; u < pThis->uValidPairs; ++u){
pPdTest = pThis->lPairs[u].pPd;
if(pPdTest == NULL) continue; /* Happens when dataset descriptors in the stream */

/* Check number of planes */
if(pPd->uPlanes != pPdTest->uPlanes) continue;
Expand Down Expand Up @@ -1040,6 +1051,23 @@ DasErrCode DasDsBldr_onPktDesc(StreamDesc* pSd, PktDesc* pPd, void* vpUd)

/* ************************************************************************* */

DasErrCode DasDsBldr_onDataSet(StreamDesc* pSd, int iPktId, DasDs* pDs, void* vpUd)
{
DasDsBldr* pThis = (DasDsBldr*)vpUd;

/* Not much to do here, we already have a valid dataset, just add it to
the list, but don't associate a packet descriptor */
if( pThis->lDsMap[iPktId] != -1)
return das_error(DASERR_BLDR, "Packet reuse not supported for DasDs descriptors");

size_t uIdx = _DasDsBldr_addPair(pThis, NULL, pDs);
pThis->lDsMap[iPktId] = uIdx;

return DAS_OKAY;
}

/* ************************************************************************* */

DasErrCode DasDsBldr_onPktData(PktDesc* pPd, void* vpUd)
{
DasDsBldr* pThis = (DasDsBldr*)vpUd;
Expand All @@ -1065,6 +1093,17 @@ DasErrCode DasDsBldr_onPktData(PktDesc* pPd, void* vpUd)
return DAS_OKAY;
}

/* ************************************************************************* */

DasErrCode DasDsBldr_onDsData(StreamDesc* pSd, int iPktId, DasDs* pDs, void* vpUd)
{
/* DasIO automatically calls dasds_decode_data which calls
DasCodec_decode which appends data, so nothing to do here */

return DAS_OKAY;
}


/* ************************************************************************* */

DasErrCode DasDsBldr_onComment(OobComment* pSc, void* vpUd)
Expand All @@ -1091,6 +1130,18 @@ DasErrCode DasDsBldr_onClose(StreamDesc* pSd, void* vpUd)

for(int i = 0; i < pThis->uValidPairs; ++i)
DasDs_setMutable(pThis->lPairs[i].pDs, false);

/* very important, do not let the stream descriptor delete our datasets
take ownership of them. */
int nPktId = 0;
DasDesc* pDesc = NULL;
while((pDesc = DasStream_nextPktDesc(pSd, &nPktId)) != NULL){
if(DasDesc_type(pDesc) == DATASET){
DasErrCode nRet = DasStream_rmPktDesc(pSd, pDesc, 0);
if(nRet != DAS_OKAY)
return nRet;
}
}

return DAS_OKAY;
}
Expand All @@ -1107,11 +1158,13 @@ DasDsBldr* new_DasDsBldr(void)

pThis->base.userData = pThis;
pThis->base.streamDescHandler = DasDsBldr_onStreamDesc;
pThis->base.pktDescHandler = DasDsBldr_onPktDesc;
pThis->base.pktDataHandler = DasDsBldr_onPktData;
pThis->base.exceptionHandler = DasDsBldr_onException;
pThis->base.closeHandler = DasDsBldr_onClose;
pThis->base.commentHandler = DasDsBldr_onComment;
pThis->base.pktDescHandler = DasDsBldr_onPktDesc;
pThis->base.dsDescHandler = DasDsBldr_onDataSet;
pThis->base.pktDataHandler = DasDsBldr_onPktData;
pThis->base.dsDataHandler = DasDsBldr_onDsData;
pThis->base.exceptionHandler = DasDsBldr_onException;
pThis->base.closeHandler = DasDsBldr_onClose;
pThis->base.commentHandler = DasDsBldr_onComment;
pThis->_released = false;

for(int i = 0; i<MAX_PKTIDS; ++i) pThis->lDsMap[i] = -1;
Expand Down Expand Up @@ -1157,8 +1210,10 @@ void del_DasDsBldr(DasDsBldr* pThis){
free(pThis->pProps);
}

for(size_t u = 0; u < pThis->uValidPairs; ++u)
del_PktDesc(pThis->lPairs[u].pPd);
for(size_t u = 0; u < pThis->uValidPairs; ++u){
if(pThis->lPairs[u].pPd != NULL)
del_PktDesc(pThis->lPairs[u].pPd);
}

free(pThis->lPairs);
free(pThis);
Expand Down
6 changes: 4 additions & 2 deletions das2/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,9 @@ DasErrCode _DasIO_handleDesc(
break;
case DATASET:
if(pHndlr->dsDescHandler != NULL)
nRet = pHndlr->dsDescHandler(pSd, (DasDs*)pSd->descriptors[nPktId], pHndlr->userData);
nRet = pHndlr->dsDescHandler(
pSd, nPktId, (DasDs*)pSd->descriptors[nPktId], pHndlr->userData
);
break;
default:
nRet = das_error(DASERR_IO, "Unexpected descriptor type %d", pDesc->type);
Expand Down Expand Up @@ -1243,7 +1245,7 @@ DasErrCode _DasIO_handleData(
if(pHndlr->dsDataHandler == NULL)
bClearDs = true;
else
nRet = pHndlr->dsDataHandler(pSd, (DasDs*)pDesc, pHndlr->userData);
nRet = pHndlr->dsDataHandler(pSd, nPktId, (DasDs*)pDesc, pHndlr->userData);
}

if(nRet != DAS_OKAY) break;
Expand Down
6 changes: 4 additions & 2 deletions das2/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,25 @@ typedef DasErrCode (*PktDataHandler)(PktDesc* pd, void* ud);
* on the input stream.
*
* @param sd A pointer to the parsed Stream Descriptor
* @param pi The packet ID associated with this dataset
* @param dd A poirter to a parsed DasDs (dataset) definition
* @param ud A pointer to a user data structure, may be NULL
*
* @param
*/
typedef DasErrCode (*DsDescHandler)(StreamDesc* sd, DasDs* dd, void* ud);
typedef DasErrCode (*DsDescHandler)(StreamDesc* sd, int pi, DasDs* dd, void* ud);

/** Callback function invoked when a new data packets for a dataset are
* encountered on the stream.
*
* @param sd A pointer to the parsed Stream Descriptor
* @param pi The packet ID associated with this dataset
* @param dd A poirter to a parsed DasDs (dataset) definition
* @param pi A pointer to the max index of the dataset before the
* new data were added
* @param ud A pointer to a user data structure, may be NULL
*/
typedef DasErrCode (*DsDataHandler)(StreamDesc* sd, DasDs* dd, void* ud);
typedef DasErrCode (*DsDataHandler)(StreamDesc* sd, int pi, DasDs* dd, void* ud);

/** Callback functions that are invoked on Stream Close
* callback function that is called at the end of the stream
Expand Down
Loading

0 comments on commit a05202c

Please sign in to comment.