diff --git a/src/libnffile/nfxV3.h b/src/libnffile/nfxV3.h index ede76a56..9ceee663 100755 --- a/src/libnffile/nfxV3.h +++ b/src/libnffile/nfxV3.h @@ -591,6 +591,8 @@ typedef struct EXinmonMeta_s { #define EXinmonMetaID 34 uint16_t frameSize; uint16_t linkType; + // uint16_t etherType; + // uint16_t fill; } EXinmonMeta_t; #define OFFframeSize offsetof(EXinmonMeta_t, frameSize) #define SIZEframeSize MemberSize(EXinmonMeta_t, frameSize) @@ -660,8 +662,6 @@ typedef struct EXdot1q_s { typedef struct EXphysicalInterface_s { #define EXphysicalInterfaceID 39 - uint16_t etherType; - uint16_t fill; uint32_t ingress; uint32_t egress; #define OFFphysIngress offsetof(EXphysicalInterface_t, ingress) diff --git a/src/nfanon/nfanon.c b/src/nfanon/nfanon.c index c17fb3b1..60948bda 100755 --- a/src/nfanon/nfanon.c +++ b/src/nfanon/nfanon.c @@ -62,7 +62,7 @@ typedef struct worker_param_s { int self; int numWorkers; - dataBlock_t *dataBlock; + dataBlock_t **dataBlock; // sync barrier pthread_barrier_t *barrier; @@ -259,6 +259,13 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, pthread_controller_wait(barrier); dataBlock_t *dataBlock = NULL; + // map datablock for workers - all workers + // process thesame block but different records + for (int i = 0; i < numWorkers; i++) { + // set new datablock for all workers + workerList[i]->dataBlock = &dataBlock; + } + int blk_count = 0; int done = 0; while (!done) { @@ -322,10 +329,6 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, } dbg_printf("Next block: %d, Records: %u\n", blk_count, dataBlock->NumRecords); - for (int i = 0; i < numWorkers; i++) { - // set new datablock for all workers - workerList[i]->dataBlock = dataBlock; - } // release workers from barrier pthread_barrier_release(barrier); @@ -338,10 +341,7 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, } // while // done! - signal all workers to terminate - for (int i = 0; i < numWorkers; i++) { - // set NULL datablock to trigger workers to exit. - workerList[i]->dataBlock = NULL; - } + dataBlock = NULL; pthread_barrier_release(barrier); FreeDataBlock(dataBlock); @@ -361,8 +361,8 @@ __attribute__((noreturn)) static void *worker(void *arg) { // wait in barrier after launch pthread_barrier_wait(worker_param->barrier); - while (worker_param->dataBlock) { - dataBlock_t *dataBlock = worker_param->dataBlock; + while (*(worker_param->dataBlock)) { + dataBlock_t *dataBlock = *(worker_param->dataBlock); dbg_printf("Worker %i working on %p\n", self, dataBlock); uint32_t recordCount = 0; @@ -372,7 +372,7 @@ __attribute__((noreturn)) static void *worker(void *arg) { for (int i = 0; i < dataBlock->NumRecords; i++) { if ((sumSize + record_ptr->size) > dataBlock->size || (record_ptr->size < sizeof(record_header_t))) { LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__); - exit(255); + goto SKIP; } sumSize += record_ptr->size; @@ -405,6 +405,7 @@ __attribute__((noreturn)) static void *worker(void *arg) { dbg_printf("Worker %i: datablock completed. Records processed: %u\n", self, recordCount); + SKIP: // Done // wait in barrier for next data record pthread_barrier_wait(worker_param->barrier); @@ -464,7 +465,7 @@ int main(int argc, char **argv) { int numWorkers = MAXANONWORKERS; int verbose = 1; int c; - while ((c = getopt(argc, argv, "hK:L:qr:w:")) != EOF) { + while ((c = getopt(argc, argv, "hK:L:qr:t:w:")) != EOF) { switch (c) { case 'h': usage(argv[0]);