Skip to content

Commit

Permalink
Cleanup nfanon
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Mar 31, 2024
1 parent 7169029 commit 9c9b71a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/libnffile/nfxV3.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ typedef struct EXinmonMeta_s {
#define EXinmonMetaID 34
uint16_t frameSize;
uint16_t linkType;
// uint16_t etherType;
// uint16_t fill;
} EXinmonMeta_t;
#define OFFframeSize offsetof(EXinmonMeta_t, frameSize)
#define SIZEframeSize MemberSize(EXinmonMeta_t, frameSize)
Expand Down Expand Up @@ -660,8 +662,6 @@ typedef struct EXdot1q_s {

typedef struct EXphysicalInterface_s {
#define EXphysicalInterfaceID 39
uint16_t etherType;
uint16_t fill;
uint32_t ingress;
uint32_t egress;
#define OFFphysIngress offsetof(EXphysicalInterface_t, ingress)
Expand Down
27 changes: 14 additions & 13 deletions src/nfanon/nfanon.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
typedef struct worker_param_s {
int self;
int numWorkers;
dataBlock_t *dataBlock;
dataBlock_t **dataBlock;

// sync barrier
pthread_barrier_t *barrier;
Expand Down Expand Up @@ -259,6 +259,13 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
pthread_controller_wait(barrier);

dataBlock_t *dataBlock = NULL;
// map datablock for workers - all workers
// process thesame block but different records
for (int i = 0; i < numWorkers; i++) {
// set new datablock for all workers
workerList[i]->dataBlock = &dataBlock;
}

int blk_count = 0;
int done = 0;
while (!done) {
Expand Down Expand Up @@ -322,10 +329,6 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
}

dbg_printf("Next block: %d, Records: %u\n", blk_count, dataBlock->NumRecords);
for (int i = 0; i < numWorkers; i++) {
// set new datablock for all workers
workerList[i]->dataBlock = dataBlock;
}
// release workers from barrier
pthread_barrier_release(barrier);

Expand All @@ -338,10 +341,7 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
} // while

// done! - signal all workers to terminate
for (int i = 0; i < numWorkers; i++) {
// set NULL datablock to trigger workers to exit.
workerList[i]->dataBlock = NULL;
}
dataBlock = NULL;
pthread_barrier_release(barrier);

FreeDataBlock(dataBlock);
Expand All @@ -361,8 +361,8 @@ __attribute__((noreturn)) static void *worker(void *arg) {
// wait in barrier after launch
pthread_barrier_wait(worker_param->barrier);

while (worker_param->dataBlock) {
dataBlock_t *dataBlock = worker_param->dataBlock;
while (*(worker_param->dataBlock)) {
dataBlock_t *dataBlock = *(worker_param->dataBlock);
dbg_printf("Worker %i working on %p\n", self, dataBlock);

uint32_t recordCount = 0;
Expand All @@ -372,7 +372,7 @@ __attribute__((noreturn)) static void *worker(void *arg) {
for (int i = 0; i < dataBlock->NumRecords; i++) {
if ((sumSize + record_ptr->size) > dataBlock->size || (record_ptr->size < sizeof(record_header_t))) {
LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__);
exit(255);
goto SKIP;
}
sumSize += record_ptr->size;

Expand Down Expand Up @@ -405,6 +405,7 @@ __attribute__((noreturn)) static void *worker(void *arg) {

dbg_printf("Worker %i: datablock completed. Records processed: %u\n", self, recordCount);

SKIP:
// Done
// wait in barrier for next data record
pthread_barrier_wait(worker_param->barrier);
Expand Down Expand Up @@ -464,7 +465,7 @@ int main(int argc, char **argv) {
int numWorkers = MAXANONWORKERS;
int verbose = 1;
int c;
while ((c = getopt(argc, argv, "hK:L:qr:w:")) != EOF) {
while ((c = getopt(argc, argv, "hK:L:qr:t:w:")) != EOF) {
switch (c) {
case 'h':
usage(argv[0]);
Expand Down

0 comments on commit 9c9b71a

Please sign in to comment.