Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

ability to specify minModificationTimeMillis as arg #107

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
o.setRequired(false);
options.addOption(o);

// Accept a minModificationTimeMillis. Don't process files before this time.
o = new Option("m", "minModificationTimeMillis", true,
"The minimum modification time of the file to be processed");
o.setArgName("minModificationTimeMillis");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is millis the most user-friendly, least error prone way to get this argument ?
Wouldn;t it be better to get something in a more huma-readable form ?
Perhaps yyyymmddhhMMss or something like that ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think having a timestamp (long number) in epoch milliseconds is cleaner than a yyyymmdd etc format.

o.setRequired(false);
options.addOption(o);
// Debugging
options.addOption("d", "debug", false, "switch on DEBUG log level");

Expand Down Expand Up @@ -285,6 +291,22 @@ public int run(String[] args) throws Exception {
throw new ProcessingException("Caught NumberFormatException during conversion "
+ " of maxFileSize to long", nfe);
}

// Grab the minModificationTimeMillis argument
long minModificationTimeMillis = 0;

if (commandLine.getOptionValue("m") != null) {
try {
minModificationTimeMillis = Long.parseLong(commandLine.getOptionValue("m"));
LOG.info("Using specified start time for filtering history files: " + minModificationTimeMillis);
} catch (NumberFormatException nfe) {
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need any more tight checks on this?
Do we allow extremely small dates (like earlier than the Hadoop project existed) or some date in the future ?
Those would probably indicate a mis-understanding of the argument and likely be an error.
How about negative numbers, those would clearly not be a valid date, but would pass for < or = comparisons silently in perhaps unexpected ways.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, I think he is choosing maximum of the two so negative or very small numbers should have no effect.

"minModificationTimeMillis has to be an epoch time (long). Can't be: "
+ commandLine.getOptionValue("m"), nfe);
}
}

LOG.info("minModificationTimeMillis: " + minModificationTimeMillis);

ProcessRecordService processRecordService = new ProcessRecordService(
hbaseConf);
Expand All @@ -298,20 +320,30 @@ public int run(String[] args) throws Exception {
if (!forceAllFiles) {
lastProcessRecord = processRecordService
.getLastSuccessfulProcessRecord(cluster);
} else {
//discard minModificationTimeMillis arguemnt given if all files
//are to be forced.
minModificationTimeMillis = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a check at the end of argument passing and confirm that the options are internally consistent.
If not, then we should throw an illegal argument exception, not simply swallow/ignore one argument

}

long minModificationTimeMillis = 0;
// Start of this time period is the end of the last period.
if (lastProcessRecord != null) {
// Start of this time period is the end of the last period.
minModificationTimeMillis = lastProcessRecord
.getMaxModificationTimeMillis();
LOG.info("lastProcessRecord time: " + lastProcessRecord.getMaxModificationTimeMillis());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor improvement suggestion -maybe have a variable that stores the lastProcessRecord.getMaxModificationTimeMillis() since it is being called thrice in this code block?

// Choose the maximum of the two.
if (minModificationTimeMillis < lastProcessRecord
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is your intent to have the ability to create a gap in what hRaven picks up ?
We would essentially skip files that came in between the high-water mark from previous run and the new argument.
The fact that there is a gap will not be recorded anywhere and those files will just "silently" never be loaded.
Once the age off the history directory structure by the RM, these will be gone.
Is that your intention ?
Could you please explain your use-cases here ?

.getMaxModificationTimeMillis()) {
minModificationTimeMillis = lastProcessRecord
.getMaxModificationTimeMillis();
LOG.info("lastProcessRecord is greater than minModificationTimeMillis. Using that as minimum time: "
+ minModificationTimeMillis);
}
}

// Do a sanity check. The end time of the last scan better not be later
// than when we started processing.
if (minModificationTimeMillis > processingStartMillis) {
throw new RuntimeException(
"The last processing record has maxModificationMillis later than now: "
"Job start time is lesser than the minimum modification time of files to read. Failing."
+ lastProcessRecord);
}

Expand Down