Skip to content

Commit

Permalink
Added CDF compression & CDF unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
cpiker committed Feb 25, 2024
1 parent e4224a8 commit 72e2d80
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 47 deletions.
12 changes: 7 additions & 5 deletions buildfiles/Linux.mak
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,15 @@ test: $(BD) $(BD)/$(TARG).a $(BUILD_TEST_PROGS) $(BULID_UTIL_PROGS)
@$(BD)/LoadStream
@echo "INFO: Running unit test for credentials manager, $(BD)/TestCredMngr..."
@$(BD)/TestCredMngr $(BD)
@echo "INFO: All test programs completed without errors"


test3:$(BD) $(BD)/das3_cdf $(BD)/$(TARG).a
@echo "INFO: Running unit test for basic das v3.0 stream parsing, $(BD)/TestV3Read..."
$(BD)/TestV3Read
$(BD)/das3_cdf -i test/ex12_sounder_xyz.d3t -o $(BD) -r
@echo "INFO: All test programs completed without errors"

test_cdf:$(BD) $(BD)/das3_cdf $(BD)/$(TARG).a
@echo "INFO: Testing CDF creation"
$(BD)/das3_cdf -l warning -i test/ex12_sounder_xyz.d3t -o $(BD) -r
cmp $(BD)/ex12_sounder_xyz.cdf test/ex12_sounder_xyz.cdf
@echo "INFO: Good, CDF matches expected output."

test_spice:$(BD) $(BD)/$(TARG).a $(BUILD_TEST_PROGS) $(BULID_UTIL_PROGS)
@echo "INFO: Running unit test for spice error redirect, $(BD)/TestSpice..."
Expand Down
Binary file added test/ex12_sounder_xyz.cdf
Binary file not shown.
127 changes: 85 additions & 42 deletions utilities/das3_cdf.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
#include <stdbool.h>
#include <assert.h>
#include <stdlib.h>

#ifdef _WIN32
#define strcasecmp _stricmp
#define strncasecmp _strnicmp
#else
#include <strings.h>
#endif

#include <cdf.h>

Expand Down Expand Up @@ -67,7 +72,7 @@

/* The default memory threshold, don't write data to disk until a dateset is
bigger then this (except onClose) */
size_t g_nMemBufThreshold = 16777216; /* 16 MBytes */
#define DEF_FLUSH_BYTES 16777216; /* 16 MBytes */

#define THRESH "16 MB"

Expand Down Expand Up @@ -191,25 +196,37 @@ void prnHelp()
" Set the location where server authentication tokens (if any)\n"
" are saved. Defaults to %s%s%s\n"
"\n"
" -U MEGS,--mem-use=MEGS\n"
" -u,-uncompressed\n"
" Disables zlib compression. All variables are written uncompressed\n"
" This is needed for any CDF files submitted to the Planetary Data\n"
" system. Per ISTP rules, Epoch variables are not compressed.\n"
"\n"
" -m MEGS,--memory=MEGS\n"
" To avoid constant writes, " PROG " buffers datasets in memory\n"
" until they are " THRESH " or larger and then they are written\n"
" to disk. Use this parameter to change the threshold. Using\n"
" a large value can increase performance for large datasets.\n"
" a large value can increase performance for large datasets. The\n"
" special values 'inf', 'infinite' or '∞' can be used to only write\n"
" record data after the stream completes."
"\n", HOME_VAR_STR, DAS_DSEPS, DEF_AUTH_FILE);


printf(
"EXAMPLES\n"
" 1. Convert a local das stream file to a CDF file:\n"
" 1. Convert a local das stream file to a CDF file.\n"
"\n"
" cat my_data.d3b | " PROG " -o my_data.cdf\n"
"\n"
" 2. Read from a remote das server and write data to the current directory,\n"
" auto-generating the CDF file name:\n"
" auto-generating the CDF file name.\n"
"\n"
" " PROG " -i https://college.edu/mission/inst?beg=2014&end=2015 -o ./\n"
"\n");
"\n"
" 3. Create a PDS archive file. Compression is disabled and records are\n"
" buffered in RAM before writing a single continuous block per variable.\n"
"\n"
" cat my_pds_data.d3b " PROG " -o my_pds_data.cdf -u -m infinite\n"
);

printf(
"AUTHOR\n"
Expand Down Expand Up @@ -280,7 +297,9 @@ static bool _getArgVal(
}

typedef struct program_optitons{
bool bRmFirst;
bool bRmFirst; /* remove output before writing */
bool bUncompressed; /* don't compress data */
size_t uMemThreshold;
char aTpltFile[256]; /* Template CDF */
char aSource[1024]; /* Input source, http://, file:// etc. */
char aOutFile[256]; /* Non-filter: output */
Expand All @@ -289,13 +308,14 @@ typedef struct program_optitons{
char aCredFile[256];
} popts_t;


#define FIELD_SZ(type, field) (sizeof(((type*)NULL)->field))

int parseArgs(int argc, char** argv, popts_t* pOpts)
{
memset(pOpts, 0, sizeof(popts_t));
pOpts->bRmFirst = false;
pOpts->bUncompressed = false;
pOpts->uMemThreshold = DEF_FLUSH_BYTES;

char sMemThresh[32] = {'\0'};

Expand Down Expand Up @@ -325,8 +345,12 @@ int parseArgs(int argc, char** argv, popts_t* pOpts)
pOpts->bRmFirst = true;
continue;
}
if(_isArg(argv[i], "-u", "--uncompressed", NULL)){
pOpts->bUncompressed = true;
continue;
}
if(_getArgVal(
sMemThresh, 32, argv, argc, &i, "-U", "--mem-use="
sMemThresh, 32, argv, argc, &i, "-m", "--memory="
))
continue;
if(_getArgVal(
Expand Down Expand Up @@ -360,11 +384,14 @@ int parseArgs(int argc, char** argv, popts_t* pOpts)

float fMemUse;
if(sMemThresh[0] != '\0'){
if((sscanf(sMemThresh, "%f", &fMemUse) != 1)||(fMemUse < 1)){
return das_error(PERR, "Invalid memory usage argument, '%s' MB", sMemThresh);
if((strncmp(sMemThresh, "inf", 3)==0)||(strcmp(sMemThresh, "∞") == 0)){
pOpts->uMemThreshold = (sizeof(size_t) == 4 ? 0xFFFFFFFF : 0xFFFFFFFFFFFFFFuLL);
}
else{
g_nMemBufThreshold = (size_t)fMemUse;
if((sscanf(sMemThresh, "%f", &fMemUse) != 1)||(fMemUse < 1))
return das_error(PERR, "Invalid memory usage argument, '%s' MB", sMemThresh);
else
pOpts->uMemThreshold = ((size_t)fMemUse) * 1048576ull ;
}
}

Expand All @@ -374,13 +401,13 @@ int parseArgs(int argc, char** argv, popts_t* pOpts)
/* ************************************************************************* */

struct context {
bool bCompress;
size_t uFlushSz; /* How big to let internal memory grow before a CDF flush */
CDFid nCdfId;
char* sTpltFile; /* An empty template CDF to put data in */
char* sWriteTo;
/* DasTime dtBeg; */ /* Start point for initial query, if known */
/* double rInterval; */ /* Size of original query, if known */

uint32_t uFlushSz; /* How big to let internal memory grow before a CDF flush */
};

/* sending CDF message to the log ****************************************** */
Expand Down Expand Up @@ -606,7 +633,7 @@ void* DasProp_cdfEntValues(const DasProp* pProp, long iEntry){
return NULL;
}

DasErrCode writeGlobalProp(CDFid iCdf, const DasProp* pProp)
DasErrCode writeGlobalProp(struct context* pCtx, const DasProp* pProp)
{
CDFstatus iStatus = CDF_OK; /* Also used by _OK macro */

Expand All @@ -620,13 +647,13 @@ DasErrCode writeGlobalProp(CDFid iCdf, const DasProp* pProp)

/* Get attribute number or make a new (why can't CDFlib use "const",
is it really so hard? */
if((iAttr = CDFgetAttrNum(iCdf, (char*)sName)) <= 0){
if(!_OK(CDFcreateAttr(iCdf, sName, GLOBAL_SCOPE, &iAttr)))
if((iAttr = CDFgetAttrNum(pCtx->nCdfId, (char*)sName)) <= 0){
if(!_OK(CDFcreateAttr(pCtx->nCdfId, sName, GLOBAL_SCOPE, &iAttr)))
return PERR;
}

iStatus = CDFputAttrgEntry(
iCdf,
pCtx->nCdfId,
iAttr,
iEntry,
DasProp_cdfType(pProp),
Expand All @@ -640,7 +667,7 @@ DasErrCode writeGlobalProp(CDFid iCdf, const DasProp* pProp)
return DAS_OKAY;
}

DasErrCode writeVarProp(CDFid iCdf, long iVarNum, const DasProp* pProp)
DasErrCode writeVarProp(struct context* pCtx, long iVarNum, const DasProp* pProp)
{
CDFstatus iStatus; /* Used by _OK macro */

Expand All @@ -649,13 +676,13 @@ DasErrCode writeVarProp(CDFid iCdf, long iVarNum, const DasProp* pProp)

const char* sName = DasProp_cdfName(pProp);

if((iAttr = CDFattrId(iCdf, sName)) < 0){
if(! _OK(CDFcreateAttr(iCdf,sName,VARIABLE_SCOPE,&iAttr)))
if((iAttr = CDFattrId(pCtx->nCdfId, sName)) < 0){
if(! _OK(CDFcreateAttr(pCtx->nCdfId,sName,VARIABLE_SCOPE,&iAttr)))
return PERR;
}

if(!_OK(CDFputAttrzEntry(
iCdf,
pCtx->nCdfId,
iAttr,
iVarNum,
DasProp_cdfType(pProp),
Expand All @@ -668,7 +695,7 @@ DasErrCode writeVarProp(CDFid iCdf, long iVarNum, const DasProp* pProp)
}

DasErrCode writeVarStrAttr(
CDFid iCdf, long iVarNum, const char* sName, const char* sValue
struct context* pCtx, long iVarNum, const char* sName, const char* sValue
){
CDFstatus iStatus; /* Used by _OK macro */

Expand All @@ -680,13 +707,13 @@ DasErrCode writeVarStrAttr(
/* If the attribute doesn't exist, we'll need to create it first */
long iAttr;

if((iAttr = CDFattrId(iCdf, sName)) < 0){
if(! _OK(CDFcreateAttr(iCdf, sName, VARIABLE_SCOPE, &iAttr )))
if((iAttr = CDFattrId(pCtx->nCdfId, sName)) < 0){
if(! _OK(CDFcreateAttr(pCtx->nCdfId, sName, VARIABLE_SCOPE, &iAttr )))
return PERR;
}

if(! _OK(CDFputAttrzEntry(
iCdf,
pCtx->nCdfId,
iAttr,
iVarNum,
CDF_CHAR,
Expand Down Expand Up @@ -749,7 +776,7 @@ DasErrCode onStream(StreamDesc* pSd, void* pUser){
if(strcmp(DasProp_name(pProp), "CDF_NAME") == 0)
continue;

if(writeGlobalProp(pCtx->nCdfId, pProp) != DAS_OKAY)
if(writeGlobalProp(pCtx, pProp) != DAS_OKAY)
return PERR;
}

Expand Down Expand Up @@ -1157,7 +1184,7 @@ long DasVar_cdfNonRecDims(
}

DasErrCode makeCdfVar(
CDFid nCdfId, DasDim* pDim, DasVar* pVar, int nDsRank, ptrdiff_t* pDsShape,
struct context* pCtx, DasDim* pDim, DasVar* pVar, int nDsRank, ptrdiff_t* pDsShape,
char* sNameBuf
){
ptrdiff_t aMin[DASIDX_MAX] = {0};
Expand Down Expand Up @@ -1219,7 +1246,7 @@ DasErrCode makeCdfVar(
DasVar_cdfName(pDim, pVar, sNameBuf, DAS_MAX_ID_BUFSZ - 1);

CDFstatus iStatus = CDFcreatezVar(
nCdfId, /* CDF File ID */
pCtx->nCdfId, /* CDF File ID */
sNameBuf, /* Varible's name */
DasVar_cdfType(pVar), /* CDF Data type of variable */
(nIntrRank > 0) ? (long) aIntr[0] : 1L, /* Character length, if needed */
Expand All @@ -1232,6 +1259,20 @@ DasErrCode makeCdfVar(
if(!_cdfOkayish(iStatus))
return PERR;

/* If the data types is not TT2000 or doesn't start with 'Epoch' go ahead
and compress it if we're able */
if( (pCtx->bCompress) && (DasVar_cdfType(pVar) != CDF_TIME_TT2000)
&& ( strncasecmp(sNameBuf, "epoch", 5) != 0)
){
long cType = GZIP_COMPRESSION;
long cParams[CDF_MAX_PARMS];
cParams[0] = 6L;
if(!_OK(CDFsetzVarCompression(
pCtx->nCdfId, DasVar_cdfId(pVar), cType, cParams
)))
return PERR;
}

/* If the is a record varying varible and it has an array, mark that
array as one we'll clear after each batch of data is written */
if(nRecVary == VARY){
Expand Down Expand Up @@ -1296,7 +1337,7 @@ DasErrCode makeCdfVar(
}

iStatus = CDFhyperPutzVarData(
nCdfId, /* CDF File ID */
pCtx->nCdfId, /* CDF File ID */
DasVar_cdfId(pVar), /* Shamelessly use point as an long int storage */
0, /* record start */
1, /* number for records to write */
Expand All @@ -1316,7 +1357,7 @@ DasErrCode makeCdfVar(
}

DasErrCode writeVarProps(
CDFid nCdfId, DasDim* pDim, DasVar* pVar, VarInfo* pCoords, size_t uCoords
struct context* pCtx, DasDim* pDim, DasVar* pVar, VarInfo* pCoords, size_t uCoords
){
char sAttrName[64] = {'\0'};

Expand Down Expand Up @@ -1355,7 +1396,7 @@ DasErrCode writeVarProps(
if((pCoords + u)->iDep == iIdx){
snprintf(sAttrName, 15, "DEPEND_%d", iIdx);
writeVarStrAttr(
nCdfId,
pCtx,
DasVar_cdfId(pVar),
sAttrName,
(pCoords+u)->sCdfName /* CDF name of the coordinate */
Expand Down Expand Up @@ -1396,12 +1437,12 @@ DasErrCode writeVarProps(
}
}

writeVarStrAttr(nCdfId, DasVar_cdfId(pVar), "UNITS", sUnits);
writeVarStrAttr(pCtx, DasVar_cdfId(pVar), "UNITS", sUnits);

if(pDim->dtype == DASDIM_COORD)
writeVarStrAttr(nCdfId, DasVar_cdfId(pVar), "VAR_TYPE", "support_data");
writeVarStrAttr(pCtx, DasVar_cdfId(pVar), "VAR_TYPE", "support_data");
else
writeVarStrAttr(nCdfId, DasVar_cdfId(pVar), "VAR_TYPE", "data");
writeVarStrAttr(pCtx, DasVar_cdfId(pVar), "VAR_TYPE", "data");

return DAS_OKAY;
}
Expand Down Expand Up @@ -1453,7 +1494,7 @@ DasErrCode onDataSet(StreamDesc* pSd, DasDs* pDs, void* pUser)
if(strcmp(DasProp_name(pProp), "CDF_NAME") == 0)
continue;

if(writeGlobalProp(pCtx->nCdfId, pProp) != DAS_OKAY)
if(writeGlobalProp(pCtx, pProp) != DAS_OKAY)
return PERR;
}

Expand All @@ -1469,11 +1510,11 @@ DasErrCode onDataSet(StreamDesc* pSd, DasDs* pDs, void* pUser)
VarInfo* pVi = (pCdfCoords + u);

if((nRet = makeCdfVar(
pCtx->nCdfId, pVi->pDim, pVi->pVar, nDsRank, aDsShape, pVi->sCdfName
pCtx, pVi->pDim, pVi->pVar, nDsRank, aDsShape, pVi->sCdfName
)) != DAS_OKAY)
return nRet;

nRet = writeVarProps(pCtx->nCdfId, pVi->pDim, pVi->pVar, pCdfCoords, uCoords);
nRet = writeVarProps(pCtx, pVi->pDim, pVi->pVar, pCdfCoords, uCoords);
if(nRet != DAS_OKAY)
return nRet;
}
Expand All @@ -1489,11 +1530,11 @@ DasErrCode onDataSet(StreamDesc* pSd, DasDs* pDs, void* pUser)
for(size_t v = 0; v < uVars; ++v){
DasVar* pVar = (DasVar*) DasDim_getVarByIdx(pDim, v);

nRet = makeCdfVar(pCtx->nCdfId, pDim, pVar, nDsRank, aDsShape, sNameBuf);
nRet = makeCdfVar(pCtx, pDim, pVar, nDsRank, aDsShape, sNameBuf);
if(nRet != DAS_OKAY)
return nRet;

nRet = writeVarProps(pCtx->nCdfId, pDim, pVar, pCdfCoords, uCoords);
nRet = writeVarProps(pCtx, pDim, pVar, pCdfCoords, uCoords);
if(nRet != DAS_OKAY)
return nRet;
}
Expand Down Expand Up @@ -1684,7 +1725,7 @@ DasErrCode onData(StreamDesc* pSd, DasDs* pDs, void* pUser)
daslog_debug_v("Dataset memory indexed: %zu bytes", DasDs_memIndexed(pDs));
}

if(DasDs_memUsed(pDs) > g_nMemBufThreshold)
if(DasDs_memUsed(pDs) > pCtx->uFlushSz)
return writeAndClearData(pDs, pCtx);

return DAS_OKAY;
Expand Down Expand Up @@ -1808,6 +1849,8 @@ int main(int argc, char** argv)
memset(&ctx, 0, sizeof(struct context));
ctx.sWriteTo = sWriteTo;
ctx.sTpltFile = opts.aTpltFile;
ctx.bCompress = !opts.bUncompressed;
ctx.uFlushSz = opts.uMemThreshold;

/* Figure out where we're gonna write before potentially contacting servers */
bool bReStream = false;
Expand Down

0 comments on commit 72e2d80

Please sign in to comment.