Skip to content

Commit

Permalink
HPCC4J-644 FileUtilityTest atomic record count false sharing (#756)
Browse files Browse the repository at this point in the history
Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu authored Sep 17, 2024
1 parent 874dba1 commit f5ac34a
Showing 1 changed file with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -843,11 +843,13 @@ public void run()
readContext.originalRD = recordDef;
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));

long recCount = 0;
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -893,13 +895,15 @@ public void run()
{
try
{
long recCount = 0;
while (fileReader.hasNext())
{
splitTable.addRecordPosition(fileReader.getStreamPosition());
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

splitTable.finish(fileReader.getStreamPosition());

Expand Down Expand Up @@ -1019,14 +1023,18 @@ public void run()
{
for (int k = 0; k < fileReaders.length; k++)
{
long recordsRead = 0;
long recordsWritten = 0;
HpccRemoteFileReader<HPCCRecord> fileReader = fileReaders[k];
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}
context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -1178,14 +1186,19 @@ public void run()
splitEnd = endingSplit.splitEnd;
}

long recordsRead = 0;
long recordsWritten = 0;
while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd)
{
HPCCRecord record = (HPCCRecord) fileReader.getNext();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}

context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord());
inputStreams[j].close();
}
Expand Down

0 comments on commit f5ac34a

Please sign in to comment.