Skip to content

Commit

Permalink
Merge branch 'main' into future
Browse files Browse the repository at this point in the history
  • Loading branch information
angelhof authored Nov 19, 2024
2 parents 570ffdc + e09e8d6 commit 00a5017
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions runtime/r_split.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ int find_new_line_pivot(char *buffer, int start_pos, int end_pos, bool backward)
if (backward) {
for (int i = end_pos; i >= start_pos; i--) {
if (buffer[i] == '\n')
return i;
return i;
}
}
else {
for (int i = start_pos; i <= end_pos; i++) {
if (buffer[i] == '\n')
return i;
return i;
}
}
return -1;
Expand Down Expand Up @@ -73,7 +73,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
if (len < full_payload) {
init_batch_size = len/numOutputFiles;
}

for (current_file_id = 0; current_file_id < numOutputFiles; current_file_id++) {
outputFile = outputFiles[current_file_id];
int next_start = 0;
Expand All @@ -89,7 +89,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
next_start = pivot + 1;
blockSize = next_start - start_pos;
}

} else {
// Process output for last node
blockSize = len - start_pos;
Expand All @@ -98,22 +98,22 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
} else {
is_last = false;
}

}

if (add_header)
writeHeader(outputFile, id, blockSize, is_last);

safeWriteWithFlush(init_buffer + start_pos, 1, blockSize, outputFile);

start_pos = next_start;

if (is_last) {
id += 1;
}
}

// This will wrap around if the last chunk was complete
// This will wrap around if the last chunk was complete
// otherwise keep pointer on the last file
if (is_last) {
current_file_id = 0;
Expand Down Expand Up @@ -146,7 +146,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
headSize = len;
blockSize = prevRestSize + headSize;
if (add_header) {
if (feof(inputFile))
if (feof(inputFile))
writeHeader(outputFile, id, blockSize, 1);
else
writeHeader(outputFile, id, blockSize, 0);
Expand Down Expand Up @@ -283,8 +283,12 @@ int main(int argc, char *argv[])
// flags: -b to use bytes (batch_size will be exact number of bytes instead of approximating to the closest line)
if (argc < 4)
{
// TODO print usage string
fprintf(stderr, "missing input!\n");
// TODO: document -r flag
fprintf(stderr,
"\n"
"Usage: %s [-b] [-r] input_file batch_size output_file_1 output_file_2 [output_file_3 ...]\n\n"
" -b: use bytes (batch_size will be exact number of bytes instead of approximating to the closest line)\n\n",
argv[0]);
exit(1);
}
bool useBytes = 0, offset = 0, raw = 0;
Expand Down

0 comments on commit 00a5017

Please sign in to comment.