diff --git a/src/main/java/se/yolean/kafka/keyvalue/KeyvalueUpdateProcessor.java b/src/main/java/se/yolean/kafka/keyvalue/KeyvalueUpdateProcessor.java index 9049877..3974c8d 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/KeyvalueUpdateProcessor.java +++ b/src/main/java/se/yolean/kafka/keyvalue/KeyvalueUpdateProcessor.java @@ -201,13 +201,17 @@ private static class OnUpdateCompletionLogging implements OnUpdate.Completion { OnUpdateCompletionLogging(UpdateRecord record, OnUpdateCompletionLogging previous) { this.record = record; if (previous == null) { - logger.info("This is the first on-update for topic {} partition {}", record.getTopic(), record.getPartition()); + logger.info("This is the first on-update for topic {} partition {} at offset {}", record.getTopic(), record.getPartition(), record.getOffset()); } else { + logger.debug("Got onupdate completeion for topic {} partition {} offset {} previous offset {}", record.getTopic(), record.getPartition(), record.getOffset(), previous.record.getOffset()); + // sanity checks here and the whole previous tracking can probably be removed once we have decent e2e coverage this.previous = previous; UpdateRecord p = previous.record; if (!record.getTopic().equals(p.getTopic())) throw new IllegalArgumentException("Mismatch with previous, topics: " + record.getTopic() + " != " + p.getTopic()); if (record.getPartition() != p.getPartition()) throw new IllegalArgumentException("Mismatch with previous, topic " + record.getTopic() + " partitions: " + record.getPartition() + "!=" + p.getPartition()); - if (record.getOffset() != p.getOffset() + 1) throw new IllegalArgumentException("Offset gap from previous, topic " + record.getTopic() + " partition " + record.getPartition() + ": from " + p.getOffset() + " to " + record.getOffset()); + if (record.getOffset() == p.getOffset()) throw new IllegalArgumentException("Duplicate completion logging for topic " + record.getTopic() + " partition " + record.getPartition() + " offset " + p.getOffset()); + // null keys will be ignored so there might be gaps, but we should be able to create these logging instances in offset order + if (record.getOffset() < p.getOffset()) throw new IllegalArgumentException("Completion tracking created in reverse offset order, topic " + record.getTopic() + " partition " + record.getPartition() + ": from " + p.getOffset() + " to " + record.getOffset()); } onUpdatePending.inc(); } diff --git a/src/main/java/se/yolean/kafka/keyvalue/cli/ArgsToOptions.java b/src/main/java/se/yolean/kafka/keyvalue/cli/ArgsToOptions.java index 6a9d967..a0fe148 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/cli/ArgsToOptions.java +++ b/src/main/java/se/yolean/kafka/keyvalue/cli/ArgsToOptions.java @@ -17,23 +17,27 @@ import net.sourceforge.argparse4j.inf.Namespace; import se.yolean.kafka.keyvalue.CacheServiceOptions; import se.yolean.kafka.keyvalue.OnUpdate; -import se.yolean.kafka.keyvalue.onupdate.OnUpdateFactory; +import se.yolean.kafka.keyvalue.onupdate.OnUpdateWithExternalPollTrigger; public class ArgsToOptions implements CacheServiceOptions { - private OnUpdateFactory onUpdateFactory = null; + /** + * {@value #DEFAULT_ONUPDATE_TIMEOUT} + */ + public static final int DEFAULT_ONUPDATE_TIMEOUT = 5000; + + /** + * {@value #DEFAULT_ONUPDATE_RETRIES} + */ + public static final int DEFAULT_ONUPDATE_RETRIES = 0; private String topicName = null; private Integer port = null; private String applicationId; private Properties streamsProperties = null; - private OnUpdate onUpdate = null; private Integer startTimeoutSeconds = null; - public ArgsToOptions setOnUpdateFactory(OnUpdateFactory factory) { - this.onUpdateFactory = factory; - return this; - } + private OnUpdateWithExternalPollTrigger onupdate; private ArgumentParser getParser() { ArgumentParser parser = ArgumentParsers @@ -99,6 +103,25 @@ private ArgumentParser getParser() { .metavar("ONUPDATE") .help("A URL to POST the key to upon updates (may be debounced)"); + parser.addArgument("--onupdate-timeout") + .action(store()) + .required(false) + .type(Integer.class) + .metavar("ONUPDATE_TIMEOUT") + .setDefault(DEFAULT_ONUPDATE_TIMEOUT) + .dest("onupdateTimeout") + .help("Milliseconds timeout for onupdate requests"); + + parser.addArgument("--onupdate-retries") + .action(store()) + .required(false) + .type(Integer.class) + .metavar("ONUPDATE_RETRIES") + .setDefault(DEFAULT_ONUPDATE_RETRIES) + .dest("onupdateRetries") + .help("The number of retries per onupdate target per update before failure is signalled to the processor." + + " Set to 0 for 1 try. TODO Default is 2, i.e. 3 tries."); + parser.addArgument("--starttimeout") .action(store()) .required(false) @@ -112,13 +135,16 @@ private ArgumentParser getParser() { return parser; } - public CacheServiceOptions fromCommandLineArguments(String[] args) { + public ArgsToOptions(String[] args) { @SuppressWarnings("unused") // kept for forward compatibility String hostName = null; - List onupdate = null; Properties props = new Properties(); + List onupdateUrls = null; + Integer onupdateTimeout = null; + Integer onupdateRetries = null; + ArgumentParser parser = getParser(); try { @@ -130,7 +156,10 @@ public CacheServiceOptions fromCommandLineArguments(String[] args) { applicationId = res.getString("applicationId"); List streamsProps = res.getList("streamsConfig"); String streamsConfig = res.getString("streamsConfigFile"); - onupdate = res.getList("onupdate"); + onupdateUrls = res.getList("onupdate"); + onupdateTimeout = res.getInt("onupdateTimeout"); + onupdateRetries = res.getInt("onupdateRetries"); + startTimeoutSeconds = res.getInt("starttimeout"); if (streamsProps == null && streamsConfig == null) { @@ -160,28 +189,21 @@ public CacheServiceOptions fromCommandLineArguments(String[] args) { if (args.length == 0) { parser.printHelp(); System.exit(0); - } else { - parser.handleError(e); - System.exit(1); } + parser.handleError(e); + System.exit(1); } catch (IOException e) { throw new RuntimeException("Options failed", e); } this.streamsProperties = props; - if (onupdate != null && !onupdate.isEmpty()) { - if (this.onUpdateFactory == null) { - throw new IllegalStateException("setOnUpdateFactory must be called first"); - } - if (onupdate.size() == 1) { - this.onUpdate = this.onUpdateFactory.fromUrl(onupdate.get(0)); - } else { - this.onUpdate = this.onUpdateFactory.fromManyUrls(onupdate); - } - } + this.onupdate = newOnUpdate(onupdateUrls, onupdateTimeout, onupdateRetries); + } - return this; + protected OnUpdateWithExternalPollTrigger newOnUpdate(List onupdateUrls, Integer onupdateTimeout, + Integer onupdateRetries) { + return new OnUpdateWithExternalPollTrigger(onupdateUrls, onupdateTimeout, onupdateRetries); } @Override @@ -199,11 +221,6 @@ public Properties getStreamsProperties() { return streamsProperties; } - @Override - public OnUpdate getOnUpdate() { - return onUpdate; - } - @Override public String getApplicationId() { return applicationId; @@ -214,4 +231,13 @@ public Integer getStartTimeoutSecods() { return startTimeoutSeconds; } + @Override + public OnUpdate getOnUpdate() { + return this.onupdate; + } + + OnUpdateWithExternalPollTrigger getOnUpdateImpl() { + return this.onupdate; + } + } diff --git a/src/main/java/se/yolean/kafka/keyvalue/cli/Main.java b/src/main/java/se/yolean/kafka/keyvalue/cli/Main.java index 707a52b..8c2effb 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/cli/Main.java +++ b/src/main/java/se/yolean/kafka/keyvalue/cli/Main.java @@ -6,21 +6,27 @@ import se.yolean.kafka.keyvalue.App; import se.yolean.kafka.keyvalue.CacheServiceOptions; import se.yolean.kafka.keyvalue.Readiness; -import se.yolean.kafka.keyvalue.onupdate.OnUpdateFactory; +import se.yolean.kafka.keyvalue.onupdate.OnUpdateWithExternalPollTrigger; public class Main { private static final Logger logger = LogManager.getLogger(Main.class); + private static final int POLL_INTERVAL = 1000; + + private static long prevpollstart = 0; public static void main(String[] args) { - OnUpdateFactory onUpdateFactory = OnUpdateFactory.getInstance(); + ArgsToOptions options = new ArgsToOptions(args); - CacheServiceOptions options = new ArgsToOptions() - .setOnUpdateFactory(onUpdateFactory) - .fromCommandLineArguments(args); + while (!appstart(options)) { + logger.info("Retrying streams app start"); + } - while (!appstart(options)) logger.info("Retrying streams app start"); + OnUpdateWithExternalPollTrigger onupdate = options.getOnUpdateImpl(); + while (true) { + pollOnupdate(onupdate); + } } /** @@ -50,11 +56,26 @@ private static boolean appstart(CacheServiceOptions options) { private static boolean poll(Readiness readiness) { try { - Thread.sleep(1000); + Thread.sleep(POLL_INTERVAL); } catch (InterruptedException e) { logger.error("Interrupted when polling for app startup status"); } return readiness.isAppReady(); } + /** + * For now there's only one thing we need to do after readiness: poll for request completion. + */ + private static void pollOnupdate(OnUpdateWithExternalPollTrigger onupdate) { + try { + // try to poll regularly, 1 second + Thread.sleep(Math.max(POLL_INTERVAL, + Math.min(1, POLL_INTERVAL - System.currentTimeMillis() + prevpollstart))); + } catch (InterruptedException e) { + logger.error("Interrupted when polling for onupdate progress"); + } + prevpollstart = System.currentTimeMillis(); + onupdate.checkCompletion(); + } + } diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/HttpTargetRequestInvoker.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/HttpTargetRequestInvoker.java new file mode 100644 index 0000000..0268067 --- /dev/null +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/HttpTargetRequestInvoker.java @@ -0,0 +1,13 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import java.util.concurrent.Future; + +import javax.ws.rs.core.Response; + +import se.yolean.kafka.keyvalue.UpdateRecord; + +public interface HttpTargetRequestInvoker { + + Future postUpdate(UpdateRecord update); + +} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/HttpTargetRequestInvokerJersey.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/HttpTargetRequestInvokerJersey.java new file mode 100644 index 0000000..03f01ce --- /dev/null +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/HttpTargetRequestInvokerJersey.java @@ -0,0 +1,45 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import java.util.concurrent.Future; + +import javax.ws.rs.client.AsyncInvoker; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; + +import se.yolean.kafka.keyvalue.UpdateRecord; + +/** + * To configure timeouts we need to know which JAX-RS impl we're using: + * + * https://stackoverflow.com/questions/22672664/setting-request-timeout-for-jax-rs-2-0-client-api + */ +public class HttpTargetRequestInvokerJersey implements HttpTargetRequestInvoker { + + private AsyncInvoker async; + + public HttpTargetRequestInvokerJersey( + String onupdateTargetUrl, + int connectTimeoutMilliseconds, + int readTimeoutMilliseconds) { + ClientConfig configuration = new ClientConfig(); + configuration.property(ClientProperties.CONNECT_TIMEOUT, connectTimeoutMilliseconds); + configuration.property(ClientProperties.READ_TIMEOUT, readTimeoutMilliseconds); + Client client = ClientBuilder.newClient(configuration); + + WebTarget target = client.target(onupdateTargetUrl); + this.async = target.request().async(); + } + + @Override + public Future postUpdate(UpdateRecord update) { + return async.post(Entity.entity(update, MediaType.APPLICATION_JSON_TYPE)); + } + +} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateFactory.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateFactory.java deleted file mode 100644 index 5b49a60..0000000 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import java.util.List; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import se.yolean.kafka.keyvalue.OnUpdate; - -public class OnUpdateFactory { - - public static final Pattern URL_VALIDATION = Pattern.compile("^https?://[^/]+/.*"); - - private static final ResponseSuccessCriteria RESPONSE_SUCCESS_CRITERIA = new ResponseSuccessCriteriaDefaultImpl(); - - private static OnUpdateFactory instance = null; - - private RequestWatcher watcher = null; - - private OnUpdateFactory() { - setRequestWatcher(new RequestWatcherDefaultImpl()); - } - - void setRequestWatcher(RequestWatcher watcher) { - this.watcher = watcher; - } - - public static OnUpdateFactory getInstance() { - if (instance == null) { - instance = new OnUpdateFactory(); - } - return instance; - } - - public OnUpdate fromUrl(String url) { - if (!URL_VALIDATION.matcher(url).matches()) { - throw new IllegalArgumentException("Invalid onupdate URL: " + url); - } - return new OnUpdateHttpIgnoreResult(url, watcher); - } - - public OnUpdate fromManyUrls(List onupdate) { - if (onupdate.size() < 2) throw new IllegalArgumentException("Use fromUrl for a single onupdate"); - List many = onupdate.stream() - .map(url -> fromUrl(url)) - .collect(Collectors.toUnmodifiableList()); - return new OnUpdateMany(many); - } - -} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateHttpIgnoreResult.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateHttpIgnoreResult.java deleted file mode 100644 index 7107bff..0000000 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateHttpIgnoreResult.java +++ /dev/null @@ -1,52 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import java.util.concurrent.Future; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import se.yolean.kafka.keyvalue.OnUpdate; -import se.yolean.kafka.keyvalue.UpdateRecord; - -/** - * Sends an async request and disregards the result, - * i.e. reports success every time and is not retryable. - */ -public class OnUpdateHttpIgnoreResult implements OnUpdate { - - private static final Logger logger = LogManager.getLogger(OnUpdateHttpIgnoreResult.class); - - private String url; - - private final Client client = ClientBuilder.newBuilder().build(); - - private RequestWatcher coordination; - - /** - * @see OnUpdateFactory - */ - OnUpdateHttpIgnoreResult(String webhookUrl, RequestWatcher coordination) { - this.url = webhookUrl; - this.coordination = coordination; - } - - @Override - public void handle(UpdateRecord update, Completion completion) { - Future res = client.target(url).request().async().post( - Entity.entity(update, MediaType.APPLICATION_JSON_TYPE)); - coordination.watch(update, res, completion); - logger.debug("Onupdate POST dispatched to {} for key {} at {},{},{}", url, update.getKey(), update.getTopic(), update.getPartition(), update.getOffset()); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + '(' + this.url + ')'; - } - -} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateMany.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateMany.java deleted file mode 100644 index ed067b4..0000000 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateMany.java +++ /dev/null @@ -1,30 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import java.util.List; - -import se.yolean.kafka.keyvalue.OnUpdate; -import se.yolean.kafka.keyvalue.UpdateRecord; - -public class OnUpdateMany implements OnUpdate { - - private List all; - - /** - * @see OnUpdateFactory - */ - OnUpdateMany(List many) { - this.all = many; - } - - @Override - public void handle(UpdateRecord update, Completion completion) { - all.forEach(single -> { - if (single instanceof OnUpdateHttpIgnoreResult) { - single.handle(update, null); - } else { - throw new IllegalArgumentException("No support yet for 1+ onupdate of type " + update.getClass().getName()); - } - }); - } - -} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateWithExternalPollTrigger.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateWithExternalPollTrigger.java new file mode 100644 index 0000000..aa2ee4a --- /dev/null +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateWithExternalPollTrigger.java @@ -0,0 +1,198 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.ws.rs.core.Response; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import se.yolean.kafka.keyvalue.OnUpdate; +import se.yolean.kafka.keyvalue.UpdateRecord; + +/** + * Starts asynchronous requests and polls for completion on external call to + * {@link #checkCompletion()}. + * + * Statically configured, i.e. a fixed set of webhook targets. + * + * This implementation assumes that {@link #checkCompletion()} is called from a (one) main control thread, + * same that would for example poll for readiness, so that no function calls are concurrent. + */ +public class OnUpdateWithExternalPollTrigger implements OnUpdate { + + private static final Logger logger = LogManager.getLogger(OnUpdateWithExternalPollTrigger.class); + + public static final ResponseSuccessCriteria DEFAULT_RESPONSE_SUCCESS_CRITERIA = + new ResponseSuccessCriteriaStatus200or204(); + + private List targets = new LinkedList<>(); + + // LinkedHashMap used to preserve iteration order + private Map pending = new LinkedHashMap<>(1); + + public OnUpdateWithExternalPollTrigger( + List onupdateUrls, + int requestTimeoutMilliseconds, + int retries) { + if (onupdateUrls.isEmpty()) { + logger.warn("Initialization without onupdate urls is only meant for testing"); + } + for (String url : onupdateUrls) { + addTarget(url, requestTimeoutMilliseconds, retries); + } + } + + /** + * Dynamic reconfiguration is outside scope so this method is kept private + */ + private void addTarget(String onupdateUrl, int timeoutMs, int retries) { + HttpTargetRequestInvokerJersey invoker = new HttpTargetRequestInvokerJersey(onupdateUrl, timeoutMs, timeoutMs); + addTarget(invoker, DEFAULT_RESPONSE_SUCCESS_CRITERIA, retries); + } + + /** + * Package-visible so we can mock this in tests. + * @param targetInvoker based on args to {@link #addTarget(String, int, long)}. + * @param criteria decides if the response should lead to + * {@link Completion#onSuccess()} or retry/{@link Completion#onFailure()}. + * @param retries the number of retries to use for this target TODO a RetryCriteria interface given the attempt# and request. + */ + OnUpdateWithExternalPollTrigger addTarget(HttpTargetRequestInvoker targetInvoker, + ResponseSuccessCriteria criteria, int retries) { + if (retries != 0) throw new IllegalArgumentException("Support for retries isn't implemented yet"); + targets.add(new Target(targetInvoker, criteria)); + + return this; + } + + @Override + public void handle(UpdateRecord update, Completion completion) { + pending.put(update, new TargetsInvocations(update, completion)); + } + + public void checkCompletion() { + Iterator> allPendingUpdates = pending.entrySet().iterator(); + while (allPendingUpdates.hasNext()) { + Entry pendingUpdate = allPendingUpdates.next(); + UpdateRecord update = pendingUpdate.getKey(); + TargetsInvocations targets = pendingUpdate.getValue(); + if (checkCompletion(update, targets)) { + allPendingUpdates.remove(); + } + } + } + + /** + * @return true if all targets have completed for this update + */ + boolean checkCompletion(UpdateRecord update, TargetsInvocations targets) { + List unfinishedRequests = targets.invocations; + if (unfinishedRequests.isEmpty()) throw new IllegalStateException("Pending status should have been removed if there are no pending requests: " + update); + Iterator allRemainingInvocations = unfinishedRequests.iterator(); + while (allRemainingInvocations.hasNext()) { + TargetInvocation invocation = allRemainingInvocations.next(); + if (invocation.request.isDone()) { + allRemainingInvocations.remove(); + Response response = null; + Throwable error = null; + try { + response = invocation.request.get(); + } catch (InterruptedException e) { + throw new IllegalStateException("Got interrupted in an operation that should have been synchronous after isDone returned true", e); + } catch (ExecutionException e) { + if (e.getCause() == null) { + throw new IllegalStateException("Failed to get response after isDone returned true, no cause given", e); + } + error = e.getCause(); + } + boolean result = false; + if (error == null) { + result = invocation.criteria.isSuccess(response); + if (!result) logger.info("Update request failure for {}: {}", invocation, response); + } else if (error instanceof java.net.ConnectException) { + logger.info("ConnectException for {}: {}", invocation, error.getMessage()); + } + targets.addResult(result); + } + } + if (unfinishedRequests.isEmpty()) { + if (targets.hasNoFailures()) { + targets.completion.onSuccess(); + } else { + targets.completion.onFailure(); + } + return true; + } + return false; + } + + private class Target { + + private HttpTargetRequestInvoker invoker; + private ResponseSuccessCriteria criteria; + + public Target(HttpTargetRequestInvoker targetInvoker, ResponseSuccessCriteria criteria) { + this.invoker = targetInvoker; + this.criteria = criteria; + } + + } + + private class TargetsInvocations { + + private Completion completion; + private List invocations; + private int successes = 0; + private int failures = 0; + + TargetsInvocations(UpdateRecord update, Completion completion) { + this.completion = completion; + this.invocations = new ArrayList<>(targets.size()); + for (Target target : targets) { + invocations.add(new TargetInvocation(update, target.invoker, target.criteria)); + } + } + + private void addResult(boolean success) { + if (success) { + successes++; + } else { + failures++; + } + } + + private boolean hasNoFailures() { + if (successes + failures == 0) throw new IllegalStateException("Sanity check failed, aggregate result checked before any result has been added"); + return failures == 0; + } + + } + + private class TargetInvocation { + + private HttpTargetRequestInvoker invoker; + private ResponseSuccessCriteria criteria; + private Future request; + + TargetInvocation(UpdateRecord update, HttpTargetRequestInvoker invoker, ResponseSuccessCriteria criteria) { + this.invoker = invoker; + this.criteria = criteria; + invoke(update); + } + + private void invoke(UpdateRecord update) { + request = invoker.postUpdate(update); + } + + } + +} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcher.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcher.java deleted file mode 100644 index f120a6e..0000000 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcher.java +++ /dev/null @@ -1,17 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import java.util.concurrent.Future; - -import javax.ws.rs.core.Response; - -import se.yolean.kafka.keyvalue.OnUpdate.Completion; -import se.yolean.kafka.keyvalue.UpdateRecord; - -/** - * Is responsible for calling {@link Completion#onFailure()} or {@link Completion#onSuccess()} when appropriate. - */ -public interface RequestWatcher { - - Future watch(UpdateRecord update, Future res, Completion completion); - -} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcherDefaultImpl.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcherDefaultImpl.java deleted file mode 100644 index e48ac61..0000000 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcherDefaultImpl.java +++ /dev/null @@ -1,46 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import javax.ws.rs.core.Response; - -import se.yolean.kafka.keyvalue.OnUpdate.Completion; -import se.yolean.kafka.keyvalue.UpdateRecord; - -public class RequestWatcherDefaultImpl implements RequestWatcher { - - private Map watching = new LinkedHashMap<>(1); - - private ExecutorService executor; - - public RequestWatcherDefaultImpl() { - this(Executors.newSingleThreadExecutor()); - } - - public RequestWatcherDefaultImpl(ExecutorService isDoneExecutor) { - this.executor = isDoneExecutor; - } - - @Override - public Future watch(UpdateRecord update, Future res, Completion completion) { - // TODO Auto-generated method stub - return null; - } - - private static class ResponseWait { - - ResponseWait() { - - } - - void add(Future res, Completion completion) { - - } - - } - -} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteria.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteria.java index be07e39..0529c4c 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteria.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteria.java @@ -4,6 +4,10 @@ public interface ResponseSuccessCriteria { + /** + * @param response A completed response (i.e. either success or failure) + * @return true if response is a success, false if failed + */ boolean isSuccess(Response response); } diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaDefaultImpl.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaDefaultImpl.java deleted file mode 100644 index a67b6f1..0000000 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaDefaultImpl.java +++ /dev/null @@ -1,12 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import javax.ws.rs.core.Response; - -public class ResponseSuccessCriteriaDefaultImpl implements ResponseSuccessCriteria { - - @Override - public boolean isSuccess(Response response) { - return response.getStatus() >= 200 && response.getStatus() < 300; - } - -} diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaStatus200or204.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaStatus200or204.java new file mode 100644 index 0000000..7c90f91 --- /dev/null +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaStatus200or204.java @@ -0,0 +1,15 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import javax.ws.rs.core.Response; + +public class ResponseSuccessCriteriaStatus200or204 implements ResponseSuccessCriteria { + + @Override + public boolean isSuccess(Response response) { + if (response == null) { + throw new IllegalArgumentException("Response is null"); + } + return response.getStatus() == 200 || response.getStatus() == 204; + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/cli/ArgsToOptionsTest.java b/src/test/java/se/yolean/kafka/keyvalue/cli/ArgsToOptionsTest.java index eafbddd..7e78a15 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/cli/ArgsToOptionsTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/cli/ArgsToOptionsTest.java @@ -1,14 +1,16 @@ package se.yolean.kafka.keyvalue.cli; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.List; import java.util.Properties; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import se.yolean.kafka.keyvalue.CacheServiceOptions; -import se.yolean.kafka.keyvalue.onupdate.OnUpdateFactory; +import se.yolean.kafka.keyvalue.onupdate.OnUpdateWithExternalPollTrigger; class ArgsToOptionsTest { @@ -21,18 +23,25 @@ void test() { + " --application-id kv-test1-001" + " --onupdate http://127.0.0.1:8081/updated" + " --starttimeout 15"; - OnUpdateFactory onUpdateFactory = Mockito.mock(OnUpdateFactory.class); - ArgsToOptions a = new ArgsToOptions(); - a.setOnUpdateFactory(onUpdateFactory); - CacheServiceOptions options = a.fromCommandLineArguments(args.split("\\s+")); + CacheServiceOptions options = new ArgsToOptions(args.split("\\s+")) { + @Override + protected OnUpdateWithExternalPollTrigger newOnUpdate(List onupdateUrls, Integer onupdateTimeout, + Integer onupdateRetries) { + assertEquals(1, onupdateUrls.size()); + assertEquals("http://127.0.0.1:8081/updated", onupdateUrls.get(0)); + assertEquals(DEFAULT_ONUPDATE_TIMEOUT, onupdateTimeout); + assertEquals(DEFAULT_ONUPDATE_RETRIES, onupdateRetries); + return Mockito.mock(OnUpdateWithExternalPollTrigger.class); + } + }; assertEquals(19081, options.getPort()); //assertEquals("mypod-abcde", options.getHostname()); assertEquals("kv-test1-001", options.getApplicationId()); - Mockito.verify(onUpdateFactory).fromUrl("http://127.0.0.1:8081/updated"); Properties props = options.getStreamsProperties(); assertEquals("localhost:19092", props.get("bootstrap.servers")); assertEquals("0", props.get("num.standby.replicas")); assertEquals(15, options.getStartTimeoutSecods()); + assertNotNull(options.getOnUpdate()); } @Test @@ -41,13 +50,22 @@ void testOnupdateMany() { + " --streams-props bootstrap.servers=localhost:19092" + " --topic topic2" + " --application-id kv-test1-001" - + " --onupdate http://127.0.0.1:8081/updated http://127.0.0.1:8082/updates"; - OnUpdateFactory onUpdateFactory = Mockito.mock(OnUpdateFactory.class); - ArgsToOptions a = new ArgsToOptions(); - a.setOnUpdateFactory(onUpdateFactory); - CacheServiceOptions options = a.fromCommandLineArguments(args.split("\\s+")); - // TODO capture and verify - Mockito.verify(onUpdateFactory).fromManyUrls(Mockito.any()); + + " --onupdate http://127.0.0.1:8081/updated http://127.0.0.1:8082/updates" + //+ " --onupdate-retries 4" + + " --onupdate-timeout 13000"; + CacheServiceOptions options = new ArgsToOptions(args.split("\\s+")) { + @Override + protected OnUpdateWithExternalPollTrigger newOnUpdate(List onupdateUrls, Integer onupdateTimeout, + Integer onupdateRetries) { + assertEquals(2, onupdateUrls.size()); + assertEquals("http://127.0.0.1:8081/updated", onupdateUrls.get(0)); + assertEquals("http://127.0.0.1:8082/updates", onupdateUrls.get(1)); + assertEquals(13000, onupdateTimeout); + assertEquals(DEFAULT_ONUPDATE_RETRIES, onupdateRetries); + return Mockito.mock(OnUpdateWithExternalPollTrigger.class); + } + }; + assertNotNull(options.getOnUpdate()); } } diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/MockRequest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/MockRequest.java new file mode 100644 index 0000000..124a90a --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/MockRequest.java @@ -0,0 +1,55 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.ws.rs.core.Response; + +import org.mockito.Mockito; + +public class MockRequest implements Future { + + private Response response = Mockito.mock(Response.class); + + /** + * Coupled to {@link MockResponseSuccessCriteria#isSuccess(Response)}. + */ + void setSuccess() { + Mockito.when(response.getStatus()).thenReturn(200); + } + + /** + * Coupled to {@link MockResponseSuccessCriteria#isSuccess(Response)} false. + */ + void setFailure() { + Mockito.when(response.getStatus()).thenReturn(500); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean isDone() { + return response.getStatus() == 200 || response.getStatus() == 500; + } + + @Override + public Response get() throws InterruptedException, ExecutionException { + return response; + } + + @Override + public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("Not implemented"); + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/MockResponseSuccessCriteria.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/MockResponseSuccessCriteria.java new file mode 100644 index 0000000..706386b --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/MockResponseSuccessCriteria.java @@ -0,0 +1,12 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import javax.ws.rs.core.Response; + +public class MockResponseSuccessCriteria implements ResponseSuccessCriteria { + + @Override + public boolean isSuccess(Response response) { + return response.getStatus() == 200; + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateFactoryTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateFactoryTest.java deleted file mode 100644 index ace4f2f..0000000 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateFactoryTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; - -class OnUpdateFactoryTest { - - @Test - void testFromUrlNoProtocol() { - OnUpdateFactory factory = OnUpdateFactory.getInstance(); - try { - factory.fromUrl("null/myupdates/"); - fail("Should have rejected invalid URL"); - } catch (IllegalArgumentException e) { - assertTrue(e.getMessage().contains("null/myupdates/")); - } - } - -} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateIntegrationTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateIntegrationTest.java deleted file mode 100644 index 80697ae..0000000 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateIntegrationTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.IOException; -import java.util.Properties; -import java.util.Random; - -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.test.ConsumerRecordFactory; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import se.yolean.kafka.keyvalue.CacheServiceOptions; -import se.yolean.kafka.keyvalue.KeyvalueUpdate; -import se.yolean.kafka.keyvalue.KeyvalueUpdateProcessor; -import se.yolean.kafka.keyvalue.OnUpdate; -import se.yolean.kafka.keyvalue.http.CacheServer; -import se.yolean.kafka.keyvalue.http.ConfigureRest; - -public class OnUpdateIntegrationTest { - - public static final String UPDATES_ENDPOINT_PATH = "/updates"; - - private TopologyTestDriver testDriver; - private ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), - new StringSerializer()); - - private static final String TOPIC1 = "topic1"; - private KeyvalueUpdate cache = null; - - private CacheServer server = null; - private int port; - private String root; - - @BeforeEach - public void setup() { - Random random = new Random(); - port = 58000 + random.nextInt(1000); - root = "http://127.0.0.1:" + port; - - OnUpdate onUpdate = OnUpdateFactory.getInstance().fromUrl(root + UPDATES_ENDPOINT_PATH); - - cache = new KeyvalueUpdateProcessor(TOPIC1, onUpdate); - - Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-kafka-keyvalue"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); - - Topology topology = cache.getTopology(); - - testDriver = new TopologyTestDriver(topology, config); - } - - @AfterEach - void teardown() throws Exception { - testDriver.close(); - - if (server != null) { - server.stop(); - } - server = null; - } - - @Test - void testBasicFlow() throws InterruptedException, IOException { - CacheServiceOptions options = Mockito.mock(CacheServiceOptions.class); - Mockito.when(options.getPort()).thenReturn(port); - - UpdatesServlet updatesServlet = new UpdatesServlet(); - - server = new ConfigureRest() - .createContext(options.getPort(), "/") - .asServlet() - .addCustomServlet(updatesServlet, UPDATES_ENDPOINT_PATH) - .create(); - server.start(); - - assertEquals(0, updatesServlet.posts.size()); - - testDriver.pipeInput(recordFactory.create(TOPIC1, "k1", "v1")); - - Thread.sleep(1000); - assertEquals(1, updatesServlet.posts.size()); - - assertEquals(1, updatesServlet.payloads.size()); - assertEquals("{\"topic\":\"topic1\",\"partition\":0,\"offset\":0,\"key\":\"k1\"}", updatesServlet.payloads.get(0)); - } - -} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateWithExternalPollTriggerTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateWithExternalPollTriggerTest.java new file mode 100644 index 0000000..ee79a5a --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateWithExternalPollTriggerTest.java @@ -0,0 +1,189 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.ws.rs.client.AsyncInvoker; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import se.yolean.kafka.keyvalue.OnUpdate; +import se.yolean.kafka.keyvalue.UpdateRecord; + +class OnUpdateWithExternalPollTriggerTest { + + private static final List DUMMY_INIT = Collections.unmodifiableList(new LinkedList()); + + @Test + void testMockRequestSuccess() throws InterruptedException, ExecutionException { + MockRequest request = new MockRequest(); + MockResponseSuccessCriteria criteria = new MockResponseSuccessCriteria(); + assertFalse(request.isDone()); + request.setSuccess(); + assertTrue(request.isDone()); + assertTrue(criteria.isSuccess(request.get())); + } + + @Test + void testMockRequestFailure() throws InterruptedException, ExecutionException { + MockRequest request = new MockRequest(); + MockResponseSuccessCriteria criteria = new MockResponseSuccessCriteria(); + assertFalse(request.isDone()); + request.setFailure(); + assertTrue(request.isDone()); + assertFalse(criteria.isSuccess(request.get())); + } + + @Disabled // was used to discover the breaking effect of using long for timeout millis + @Test + void testRealRequest() throws InterruptedException, ExecutionException { + String onupdateTargetUrl = "http://yolean.com"; + int connectTimeoutMilliseconds = 5000; + int readTimeoutMilliseconds = 5000; + ClientConfig configuration = new ClientConfig(); + configuration.property(ClientProperties.CONNECT_TIMEOUT, connectTimeoutMilliseconds); + configuration.property(ClientProperties.READ_TIMEOUT, readTimeoutMilliseconds); + Client client = ClientBuilder.newClient(configuration); + + WebTarget target = client.target(onupdateTargetUrl); + AsyncInvoker async = target.request().async(); + UpdateRecord update = new UpdateRecord("t", 0, 1, "k"); + Future request = async.post(Entity.entity(update, MediaType.APPLICATION_JSON_TYPE)); + assertFalse(request.isDone()); + while (!request.isDone()) { + Thread.sleep(100); + } + Response response = request.get(); + assertNotNull(response); + } + + @Test + void testSingleSuccessful() throws InterruptedException, ExecutionException { + HttpTargetRequestInvoker invoker = mock(HttpTargetRequestInvoker.class); + OnUpdateWithExternalPollTrigger updates = new OnUpdateWithExternalPollTrigger( + DUMMY_INIT, -1, -1) + .addTarget(invoker, new MockResponseSuccessCriteria(), 0); + + OnUpdate.Completion completion = mock(OnUpdate.Completion.class); + UpdateRecord update = new UpdateRecord("topic1", 0, 15, "key1"); + + MockRequest request = new MockRequest(); + when(invoker.postUpdate(update)).thenReturn(request); + updates.handle(update, completion); + + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + + request.setSuccess(); + updates.checkCompletion(); + verify(completion).onSuccess(); + verify(completion, never()).onFailure(); + } + + @Test + void testSingleFailure() throws InterruptedException, ExecutionException { + HttpTargetRequestInvoker invoker = mock(HttpTargetRequestInvoker.class); + OnUpdateWithExternalPollTrigger updates = new OnUpdateWithExternalPollTrigger( + DUMMY_INIT, -1, -1) + .addTarget(invoker, new MockResponseSuccessCriteria(), 0); + + OnUpdate.Completion completion = mock(OnUpdate.Completion.class); + UpdateRecord update = new UpdateRecord("topic1", 0, 15, "key1"); + + MockRequest request = new MockRequest(); + when(invoker.postUpdate(update)).thenReturn(request); + updates.handle(update, completion); + + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + + request.setFailure(); + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + verify(completion).onFailure(); + } + + @Test + void testMultipleAllSuccessful() throws InterruptedException, ExecutionException { + HttpTargetRequestInvoker invoker1 = mock(HttpTargetRequestInvoker.class); + HttpTargetRequestInvoker invoker2 = mock(HttpTargetRequestInvoker.class); + OnUpdateWithExternalPollTrigger updates = new OnUpdateWithExternalPollTrigger( + DUMMY_INIT, -1, -1) + .addTarget(invoker1, new MockResponseSuccessCriteria(), 0) + .addTarget(invoker2, new MockResponseSuccessCriteria(), 0); + + OnUpdate.Completion completion = mock(OnUpdate.Completion.class); + UpdateRecord update = new UpdateRecord("topic1", 0, 15, "key1"); + + MockRequest request1 = new MockRequest(); + when(invoker1.postUpdate(update)).thenReturn(request1); + MockRequest request2 = new MockRequest(); + when(invoker2.postUpdate(update)).thenReturn(request2); + + updates.handle(update, completion); + + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + verify(completion, never()).onFailure(); + + // only one completed + request1.setSuccess(); + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + verify(completion, never()).onFailure(); + + request2.setSuccess(); + updates.checkCompletion(); + verify(completion).onSuccess(); + verify(completion, never()).onFailure(); + } + + @Test + void testMultipleOneFailed() throws InterruptedException, ExecutionException { + HttpTargetRequestInvoker invoker1 = mock(HttpTargetRequestInvoker.class); + HttpTargetRequestInvoker invoker2 = mock(HttpTargetRequestInvoker.class); + OnUpdateWithExternalPollTrigger updates = new OnUpdateWithExternalPollTrigger( + DUMMY_INIT, -1, -1) + .addTarget(invoker1, new MockResponseSuccessCriteria(), 0) + .addTarget(invoker2, new MockResponseSuccessCriteria(), 0); + + OnUpdate.Completion completion = mock(OnUpdate.Completion.class); + UpdateRecord update = new UpdateRecord("topic1", 0, 15, "key1"); + + MockRequest request1 = new MockRequest(); + when(invoker1.postUpdate(update)).thenReturn(request1); + MockRequest request2 = new MockRequest(); + when(invoker2.postUpdate(update)).thenReturn(request2); + + updates.handle(update, completion); + + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + verify(completion, never()).onFailure(); + + // only one completed + request1.setFailure(); + request2.setSuccess(); + updates.checkCompletion(); + verify(completion, never()).onSuccess(); + verify(completion).onFailure(); + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcherDefaultImplTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcherDefaultImplTest.java deleted file mode 100644 index c0419d6..0000000 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/RequestWatcherDefaultImplTest.java +++ /dev/null @@ -1,14 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; - -class RequestWatcherDefaultImplTest { - - @Test - void testWatch() { - fail("Not yet implemented"); - } - -} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaDefaultImplTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaDefaultImplTest.java deleted file mode 100644 index 75ce278..0000000 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaDefaultImplTest.java +++ /dev/null @@ -1,21 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import static org.junit.jupiter.api.Assertions.*; - -import javax.ws.rs.core.Response; - -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -class ResponseSuccessCriteriaDefaultImplTest { - - private ResponseSuccessCriteria criteria = new ResponseSuccessCriteriaDefaultImpl(); - - @Test - void testIsSuccess() { - Response response = Mockito.mock(Response.class); - Mockito.when(response.getStatus()).thenReturn(200); - assertTrue(criteria.isSuccess(response)); - } - -} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaStatus200or204Test.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaStatus200or204Test.java new file mode 100644 index 0000000..9f4edb0 --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/ResponseSuccessCriteriaStatus200or204Test.java @@ -0,0 +1,48 @@ +package se.yolean.kafka.keyvalue.onupdate; + +import static org.junit.jupiter.api.Assertions.*; + +import javax.ws.rs.core.Response; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import se.yolean.kafka.keyvalue.onupdate.ResponseSuccessCriteria; +import se.yolean.kafka.keyvalue.onupdate.ResponseSuccessCriteriaStatus200or204; + +class ResponseSuccessCriteriaStatus200or204Test { + + private ResponseSuccessCriteria criteria = new ResponseSuccessCriteriaStatus200or204(); + + @Test + void testIsSuccess() { + Response response = Mockito.mock(Response.class); + Mockito.when(response.getStatus()).thenReturn(200); + assertTrue(criteria.isSuccess(response)); + } + + @Test + void testIsSuccess204() { + Response response = Mockito.mock(Response.class); + Mockito.when(response.getStatus()).thenReturn(204); + assertTrue(criteria.isSuccess(response)); + } + + @Test + void testIsSuccessNot201() { + Response response = Mockito.mock(Response.class); + Mockito.when(response.getStatus()).thenReturn(201); + assertFalse(criteria.isSuccess(response)); + } + + @Test + void testNullResponse() { + try { + criteria.isSuccess(null); + fail("Should have thrown on null response"); + } catch (IllegalArgumentException e) { + assertEquals("Response is null", e.getMessage()); + } + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesServlet.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesServlet.java deleted file mode 100644 index 7629d15..0000000 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesServlet.java +++ /dev/null @@ -1,29 +0,0 @@ -package se.yolean.kafka.keyvalue.onupdate; - -import java.io.BufferedReader; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -public class UpdatesServlet extends HttpServlet { - - final List posts = new LinkedList<>(); - final List payloads = new LinkedList<>(); - - private static final long serialVersionUID = 1L; - - @Override - protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - posts.add(req); - BufferedReader reader = req.getReader(); - payloads.add(reader.readLine()); - reader.close(); - resp.setStatus(200); - } - -}