Skip to content

Commit

Permalink
Merge branch 'onupdate-remodelled' into onupdate-resultlogging
Browse files Browse the repository at this point in the history
  • Loading branch information
solsson committed Feb 26, 2019
2 parents 90cbfc1 + ccdb280 commit d330aa5
Show file tree
Hide file tree
Showing 24 changed files with 699 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,17 @@ private static class OnUpdateCompletionLogging implements OnUpdate.Completion {
OnUpdateCompletionLogging(UpdateRecord record, OnUpdateCompletionLogging previous) {
this.record = record;
if (previous == null) {
logger.info("This is the first on-update for topic {} partition {}", record.getTopic(), record.getPartition());
logger.info("This is the first on-update for topic {} partition {} at offset {}", record.getTopic(), record.getPartition(), record.getOffset());
} else {
logger.debug("Got onupdate completeion for topic {} partition {} offset {} previous offset {}", record.getTopic(), record.getPartition(), record.getOffset(), previous.record.getOffset());
// sanity checks here and the whole previous tracking can probably be removed once we have decent e2e coverage
this.previous = previous;
UpdateRecord p = previous.record;
if (!record.getTopic().equals(p.getTopic())) throw new IllegalArgumentException("Mismatch with previous, topics: " + record.getTopic() + " != " + p.getTopic());
if (record.getPartition() != p.getPartition()) throw new IllegalArgumentException("Mismatch with previous, topic " + record.getTopic() + " partitions: " + record.getPartition() + "!=" + p.getPartition());
if (record.getOffset() != p.getOffset() + 1) throw new IllegalArgumentException("Offset gap from previous, topic " + record.getTopic() + " partition " + record.getPartition() + ": from " + p.getOffset() + " to " + record.getOffset());
if (record.getOffset() == p.getOffset()) throw new IllegalArgumentException("Duplicate completion logging for topic " + record.getTopic() + " partition " + record.getPartition() + " offset " + p.getOffset());
// null keys will be ignored so there might be gaps, but we should be able to create these logging instances in offset order
if (record.getOffset() < p.getOffset()) throw new IllegalArgumentException("Completion tracking created in reverse offset order, topic " + record.getTopic() + " partition " + record.getPartition() + ": from " + p.getOffset() + " to " + record.getOffset());
}
onUpdatePending.inc();
}
Expand Down
84 changes: 55 additions & 29 deletions src/main/java/se/yolean/kafka/keyvalue/cli/ArgsToOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
import net.sourceforge.argparse4j.inf.Namespace;
import se.yolean.kafka.keyvalue.CacheServiceOptions;
import se.yolean.kafka.keyvalue.OnUpdate;
import se.yolean.kafka.keyvalue.onupdate.OnUpdateFactory;
import se.yolean.kafka.keyvalue.onupdate.OnUpdateWithExternalPollTrigger;

public class ArgsToOptions implements CacheServiceOptions {

private OnUpdateFactory onUpdateFactory = null;
/**
* {@value #DEFAULT_ONUPDATE_TIMEOUT}
*/
public static final int DEFAULT_ONUPDATE_TIMEOUT = 5000;

/**
* {@value #DEFAULT_ONUPDATE_RETRIES}
*/
public static final int DEFAULT_ONUPDATE_RETRIES = 0;

private String topicName = null;
private Integer port = null;
private String applicationId;
private Properties streamsProperties = null;
private OnUpdate onUpdate = null;
private Integer startTimeoutSeconds = null;

public ArgsToOptions setOnUpdateFactory(OnUpdateFactory factory) {
this.onUpdateFactory = factory;
return this;
}
private OnUpdateWithExternalPollTrigger onupdate;

private ArgumentParser getParser() {
ArgumentParser parser = ArgumentParsers
Expand Down Expand Up @@ -99,6 +103,25 @@ private ArgumentParser getParser() {
.metavar("ONUPDATE")
.help("A URL to POST the key to upon updates (may be debounced)");

parser.addArgument("--onupdate-timeout")
.action(store())
.required(false)
.type(Integer.class)
.metavar("ONUPDATE_TIMEOUT")
.setDefault(DEFAULT_ONUPDATE_TIMEOUT)
.dest("onupdateTimeout")
.help("Milliseconds timeout for onupdate requests");

parser.addArgument("--onupdate-retries")
.action(store())
.required(false)
.type(Integer.class)
.metavar("ONUPDATE_RETRIES")
.setDefault(DEFAULT_ONUPDATE_RETRIES)
.dest("onupdateRetries")
.help("The number of retries per onupdate target per update before failure is signalled to the processor."
+ " Set to 0 for 1 try. TODO Default is 2, i.e. 3 tries.");

parser.addArgument("--starttimeout")
.action(store())
.required(false)
Expand All @@ -112,13 +135,16 @@ private ArgumentParser getParser() {
return parser;
}

public CacheServiceOptions fromCommandLineArguments(String[] args) {
public ArgsToOptions(String[] args) {

@SuppressWarnings("unused") // kept for forward compatibility
String hostName = null;
List<String> onupdate = null;
Properties props = new Properties();

List<String> onupdateUrls = null;
Integer onupdateTimeout = null;
Integer onupdateRetries = null;

ArgumentParser parser = getParser();

try {
Expand All @@ -130,7 +156,10 @@ public CacheServiceOptions fromCommandLineArguments(String[] args) {
applicationId = res.getString("applicationId");
List<String> streamsProps = res.getList("streamsConfig");
String streamsConfig = res.getString("streamsConfigFile");
onupdate = res.getList("onupdate");
onupdateUrls = res.getList("onupdate");
onupdateTimeout = res.getInt("onupdateTimeout");
onupdateRetries = res.getInt("onupdateRetries");

startTimeoutSeconds = res.getInt("starttimeout");

if (streamsProps == null && streamsConfig == null) {
Expand Down Expand Up @@ -160,28 +189,21 @@ public CacheServiceOptions fromCommandLineArguments(String[] args) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
parser.handleError(e);
System.exit(1);
} catch (IOException e) {
throw new RuntimeException("Options failed", e);
}

this.streamsProperties = props;

if (onupdate != null && !onupdate.isEmpty()) {
if (this.onUpdateFactory == null) {
throw new IllegalStateException("setOnUpdateFactory must be called first");
}
if (onupdate.size() == 1) {
this.onUpdate = this.onUpdateFactory.fromUrl(onupdate.get(0));
} else {
this.onUpdate = this.onUpdateFactory.fromManyUrls(onupdate);
}
}
this.onupdate = newOnUpdate(onupdateUrls, onupdateTimeout, onupdateRetries);
}

return this;
protected OnUpdateWithExternalPollTrigger newOnUpdate(List<String> onupdateUrls, Integer onupdateTimeout,
Integer onupdateRetries) {
return new OnUpdateWithExternalPollTrigger(onupdateUrls, onupdateTimeout, onupdateRetries);
}

@Override
Expand All @@ -199,11 +221,6 @@ public Properties getStreamsProperties() {
return streamsProperties;
}

@Override
public OnUpdate getOnUpdate() {
return onUpdate;
}

@Override
public String getApplicationId() {
return applicationId;
Expand All @@ -214,4 +231,13 @@ public Integer getStartTimeoutSecods() {
return startTimeoutSeconds;
}

@Override
public OnUpdate getOnUpdate() {
return this.onupdate;
}

OnUpdateWithExternalPollTrigger getOnUpdateImpl() {
return this.onupdate;
}

}
35 changes: 28 additions & 7 deletions src/main/java/se/yolean/kafka/keyvalue/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,27 @@
import se.yolean.kafka.keyvalue.App;
import se.yolean.kafka.keyvalue.CacheServiceOptions;
import se.yolean.kafka.keyvalue.Readiness;
import se.yolean.kafka.keyvalue.onupdate.OnUpdateFactory;
import se.yolean.kafka.keyvalue.onupdate.OnUpdateWithExternalPollTrigger;

public class Main {

private static final Logger logger = LogManager.getLogger(Main.class);

private static final int POLL_INTERVAL = 1000;

private static long prevpollstart = 0;

public static void main(String[] args) {
OnUpdateFactory onUpdateFactory = OnUpdateFactory.getInstance();
ArgsToOptions options = new ArgsToOptions(args);

CacheServiceOptions options = new ArgsToOptions()
.setOnUpdateFactory(onUpdateFactory)
.fromCommandLineArguments(args);
while (!appstart(options)) {
logger.info("Retrying streams app start");
}

while (!appstart(options)) logger.info("Retrying streams app start");
OnUpdateWithExternalPollTrigger onupdate = options.getOnUpdateImpl();
while (true) {
pollOnupdate(onupdate);
}
}

/**
Expand Down Expand Up @@ -50,11 +56,26 @@ private static boolean appstart(CacheServiceOptions options) {

private static boolean poll(Readiness readiness) {
try {
Thread.sleep(1000);
Thread.sleep(POLL_INTERVAL);
} catch (InterruptedException e) {
logger.error("Interrupted when polling for app startup status");
}
return readiness.isAppReady();
}

/**
* For now there's only one thing we need to do after readiness: poll for request completion.
*/
private static void pollOnupdate(OnUpdateWithExternalPollTrigger onupdate) {
try {
// try to poll regularly, 1 second
Thread.sleep(Math.max(POLL_INTERVAL,
Math.min(1, POLL_INTERVAL - System.currentTimeMillis() + prevpollstart)));
} catch (InterruptedException e) {
logger.error("Interrupted when polling for onupdate progress");
}
prevpollstart = System.currentTimeMillis();
onupdate.checkCompletion();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package se.yolean.kafka.keyvalue.onupdate;

import java.util.concurrent.Future;

import javax.ws.rs.core.Response;

import se.yolean.kafka.keyvalue.UpdateRecord;

public interface HttpTargetRequestInvoker {

Future<Response> postUpdate(UpdateRecord update);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package se.yolean.kafka.keyvalue.onupdate;

import java.util.concurrent.Future;

import javax.ws.rs.client.AsyncInvoker;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;

import se.yolean.kafka.keyvalue.UpdateRecord;

/**
* To configure timeouts we need to know which JAX-RS impl we're using:
*
* https://stackoverflow.com/questions/22672664/setting-request-timeout-for-jax-rs-2-0-client-api
*/
public class HttpTargetRequestInvokerJersey implements HttpTargetRequestInvoker {

private AsyncInvoker async;

public HttpTargetRequestInvokerJersey(
String onupdateTargetUrl,
int connectTimeoutMilliseconds,
int readTimeoutMilliseconds) {
ClientConfig configuration = new ClientConfig();
configuration.property(ClientProperties.CONNECT_TIMEOUT, connectTimeoutMilliseconds);
configuration.property(ClientProperties.READ_TIMEOUT, readTimeoutMilliseconds);
Client client = ClientBuilder.newClient(configuration);

WebTarget target = client.target(onupdateTargetUrl);
this.async = target.request().async();
}

@Override
public Future<Response> postUpdate(UpdateRecord update) {
return async.post(Entity.entity(update, MediaType.APPLICATION_JSON_TYPE));
}

}

This file was deleted.

This file was deleted.

Loading

0 comments on commit d330aa5

Please sign in to comment.