diff --git a/README.md b/README.md
index 5247f22c..097ed8f1 100644
--- a/README.md
+++ b/README.md
@@ -257,3 +257,13 @@ nohup java -cp "smart-connector-rest-dist-1.2.3.jar:dependency/*" eu.knowledge.e
### Starting the Knowledge Engine in distributed mode
The Knowledge Engine can also start in distributed mode, where it connects with a remote knowledge directory and where different instances of the Knowledge Engine (each instance hosting one or more smart connectors) can communicate with each other. More information about starting the Knowledge Engine in distributed mode can be found in the [documentation](docs/04_distributed_mode.md).
+
+### Additional configuration environment variables
+
+*Increasing the wait time for other KBs to respond*
+
+By default, a Smart Connector waits `10` seconds max for a reply from another Smart Connector when sending an ASK/POST message. This time is configurable via the `KE_KB_WAIT_TIMEOUT` environment variable and setting it to `0` means the Smart Connector will wait indefinitely (this can be useful when dealing with Human KBs).
+
+*Increasing the HTTP timeouts*
+
+By default, a KER waits `5` seconds max for a HTTP response from another KER when sending a message via the inter-KER protocol. The time is configurable via the `KE_HTTP_TIMEOUT` environment variable.
\ No newline at end of file
diff --git a/admin-ui/pom.xml b/admin-ui/pom.xml
index 125b055f..f9d97557 100644
--- a/admin-ui/pom.xml
+++ b/admin-ui/pom.xml
@@ -115,15 +115,22 @@
org.junit.jupiter
junit-jupiter-api
- 5.10.1
+ 5.10.2
test
org.junit.jupiter
junit-jupiter-engine
- 5.7.0
+ 5.10.2
test
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.2
+ test
+
+
eu.knowledge.engine
smart-connector-rest-server
@@ -200,6 +207,7 @@
true
with-dependencies
+ false
diff --git a/examples/unreachable-runtimes/Dockerfile b/examples/unreachable-runtimes/Dockerfile
index d1360f48..3375bb30 100644
--- a/examples/unreachable-runtimes/Dockerfile
+++ b/examples/unreachable-runtimes/Dockerfile
@@ -1,4 +1,4 @@
-FROM ghcr.io/tno/knowledge-engine/smart-connector:1.2.4
+FROM ghcr.io/tno/knowledge-engine/smart-connector:1.2.4-SNAPSHOT
USER root
diff --git a/examples/unreachable-runtimes/readme.md b/examples/unreachable-runtimes/readme.md
index e5d1fc0f..130f1cd8 100644
--- a/examples/unreachable-runtimes/readme.md
+++ b/examples/unreachable-runtimes/readme.md
@@ -1,7 +1,7 @@
## Knowledge Engine's distributed mode test
This docker compose project is used to test the Knowledge Engine's behavior in distributed mode when something exceptional happens (i.e. divergence from the happy flow). For example, one participant in the Knowledge Network configured its KER incorrectly and therefore it can reach out, but no one can contact the KER from the outside (via the Inter-KER protocol). Under such circumstances, we want the Knowledge Engine to keep functioning and behave as normal as possible.
-To test this, we setup a distributed KER environment with 3 KER+KB combis that exchange data. We have `runtime-1+kb1`, `runtime-2+kb2` and `runtime-3+kb3`. By using the `iptables` tool for `runtime-3` we can simulate a misconfigured KER and test how the other Knowledge Engines behave. Use the following instructions to simulate the misconfigured KER.
+To test this, we setup a distributed KER environment with 3 KER+KB combis that exchange data. We have `runtime-1+kb1`, `runtime-2+kb2` and `runtime-3+kb3`. By using the `iptables` tool for `runtime-3` we can simulate a misconfigured KER and test how the other Knowledge Engines behave. Use the following instructions to simulate the misconfigured KER. In the future we might want to use [Awall](https://github.com/alpinelinux/awall) instead of `iptables`.
Start the docker compose project: `docker compose up -d`
@@ -15,7 +15,7 @@ Retrieve the internal IP address of the KB3 (because it needs to always be able
Make sure runtime-3 is configured to switch between being reachable to being unreachable. First open a shell for runtime-3.
```
-docker compose exec runtime-3 bash
+docker compose exec runtime-3 sh
```
Configure `iptables-legacy` to allow the following packets to go through when we block incoming traffic:
@@ -44,4 +44,10 @@ iptables-legacy -P INPUT DROP
iptables-legacy -P INPUT ACCEPT
#runtime-3 is now reachable again for other KERs and can also reach the KD and other KERs.
+```
+
+Another scenario that you can check is when other KERs can access runtime-3, but it cannot send back a response to runtime-1. To do this, use the following filewall rule:
+
+```
+iptables-legacy -A OUTPUT -p tcp -d runtime-1 -m state --state NEW -j DROP
```
\ No newline at end of file
diff --git a/knowledge-directory/pom.xml b/knowledge-directory/pom.xml
index 6311e90f..68dc53f2 100644
--- a/knowledge-directory/pom.xml
+++ b/knowledge-directory/pom.xml
@@ -44,9 +44,21 @@
compile
- junit
- junit
- ${junit-version}
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.10.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ 5.10.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.2
test
@@ -229,6 +241,7 @@
true
with-dependencies
+ false
diff --git a/reasoner/pom.xml b/reasoner/pom.xml
index c83352d4..6bb87264 100644
--- a/reasoner/pom.xml
+++ b/reasoner/pom.xml
@@ -19,15 +19,22 @@
org.junit.jupiter
junit-jupiter-api
- 5.10.1
+ 5.10.2
test
org.junit.jupiter
junit-jupiter-engine
- 5.7.0
+ 5.10.2
test
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.2
+ test
+
+
org.slf4j
diff --git a/smart-connector-rest-dist/pom.xml b/smart-connector-rest-dist/pom.xml
index f7b15743..02c1be7f 100644
--- a/smart-connector-rest-dist/pom.xml
+++ b/smart-connector-rest-dist/pom.xml
@@ -25,13 +25,19 @@
org.junit.jupiter
junit-jupiter-api
- 5.10.1
+ 5.10.2
test
org.junit.jupiter
junit-jupiter-engine
- 5.7.0
+ 5.10.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.2
test
@@ -64,6 +70,7 @@
true
with-dependencies
+ false
diff --git a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java
index 06f3f2b6..253ced31 100644
--- a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java
+++ b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java
@@ -11,14 +11,14 @@
public class RestServerHelper {
private static final Logger LOG = LoggerFactory.getLogger(RestServerHelper.class);
private static int WAIT_BEFORE_NEXT_POLL = 300;
-
+
private Thread thread;
public void start(int port) {
var r = new Runnable() {
@Override
public void run() {
- RestServer.main(new String[] {String.format("%d", port)});
+ RestServer.main(new String[] { String.format("%d", port) });
}
};
this.thread = new Thread(r);
@@ -44,10 +44,10 @@ public void cleanUp() {
}
private static boolean portAvailable(int port) {
- try (Socket ignored = new Socket("localhost", port)) {
- return false;
- } catch (IOException ignored) {
- return true;
- }
+ try (Socket ignored = new Socket("localhost", port)) {
+ return false;
+ } catch (IOException ignored) {
+ return true;
+ }
}
}
diff --git a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java
index 01804aab..2d2b5c22 100644
--- a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java
+++ b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java
@@ -6,6 +6,7 @@
import java.net.URL;
import java.util.Map;
+import org.apache.jena.atlas.logging.Log;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
@@ -30,34 +31,19 @@ public void setUpServer() {
public void testRegisterKi() throws IOException {
URL url = new URL("http://localhost:" + PORT + "/rest");
- HttpTester registerKb = new HttpTester(new URL(url + "/sc"), "POST", "{\"knowledgeBaseId\": \"http://example.com/kb\", \"knowledgeBaseName\": \"KB\", \"knowledgeBaseDescription\": \"KB\"}", Map.of(
- "Content-Type", "application/json",
- "Accept", "*/*"
- ));
+ HttpTester registerKb = new HttpTester(new URL(url + "/sc"), "POST",
+ "{\"knowledgeBaseId\": \"http://example.com/kb\", \"knowledgeBaseName\": \"KB\", \"knowledgeBaseDescription\": \"KB\"}",
+ Map.of("Content-Type", "application/json", "Accept", "*/*"));
registerKb.expectStatus(200);
- HttpTester registerKiWithName = new HttpTester(
- new URL(url + "/sc/ki"),
- "POST",
- "{\"knowledgeInteractionType\": \"AskKnowledgeInteraction\", \"knowledgeInteractionName\": \"some-name\", \"graphPattern\": \"?a ?b ?c.\"}",
- Map.of(
- "Knowledge-Base-Id", "http://example.com/kb",
- "Content-Type", "application/json",
- "Accept", "*/*"
- )
- );
+ HttpTester registerKiWithName = new HttpTester(new URL(url + "/sc/ki"), "POST",
+ "{\"knowledgeInteractionType\": \"AskKnowledgeInteraction\", \"knowledgeInteractionName\": \"some-name\", \"graphPattern\": \"?a ?b ?c.\"}",
+ Map.of("Knowledge-Base-Id", "http://example.com/kb", "Content-Type", "application/json", "Accept",
+ "*/*"));
registerKiWithName.expectStatus(200);
- HttpTester getKiWithName = new HttpTester(
- new URL(url + "/sc/ki"),
- "GET",
- null,
- Map.of(
- "Knowledge-Base-Id", "http://example.com/kb",
- "Content-Type", "application/json",
- "Accept", "*/*"
- )
- );
+ HttpTester getKiWithName = new HttpTester(new URL(url + "/sc/ki"), "GET", null, Map.of("Knowledge-Base-Id",
+ "http://example.com/kb", "Content-Type", "application/json", "Accept", "*/*"));
var body = getKiWithName.getBody();
assertTrue(body.contains("\"http://example.com/kb/interaction/some-name\""));
}
diff --git a/smart-connector-rest-server/pom.xml b/smart-connector-rest-server/pom.xml
index 0d7bf549..eaa85b4e 100644
--- a/smart-connector-rest-server/pom.xml
+++ b/smart-connector-rest-server/pom.xml
@@ -120,13 +120,19 @@
org.junit.jupiter
junit-jupiter-api
- 5.10.1
+ 5.10.2
test
org.junit.jupiter
junit-jupiter-engine
- 5.7.0
+ 5.10.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.2
test
diff --git a/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java b/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java
index da24fa82..29d73bcc 100644
--- a/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java
+++ b/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java
@@ -145,7 +145,9 @@ public void scPost(@Parameter(description = "", required = true) @NotNull @Valid
final boolean reasonerEnabled = smartConnector.getReasonerEnabled() == null ? false
: smartConnector.getReasonerEnabled();
-
+
+ LOG.info("Creating smart connector with ID {}.", kbId);
+
// Tell the manager to create a KB, store it, and have it set up a SC etc.
this.manager.createKB(new SmartConnector().knowledgeBaseId(kbId.toString()).knowledgeBaseName(kbName)
.knowledgeBaseDescription(kbDescription).leaseRenewalTime(smartConnector.getLeaseRenewalTime())
@@ -154,8 +156,6 @@ public void scPost(@Parameter(description = "", required = true) @NotNull @Valid
asyncResponse.resume(Response.ok().build());
});
- LOG.info("Creating smart connector with ID {}.", kbId);
-
return;
}
diff --git a/smart-connector/pom.xml b/smart-connector/pom.xml
index 05dd564a..ffbdd55e 100644
--- a/smart-connector/pom.xml
+++ b/smart-connector/pom.xml
@@ -32,13 +32,19 @@
org.junit.jupiter
junit-jupiter-api
- 5.10.1
+ 5.10.2
test
org.junit.jupiter
junit-jupiter-engine
- 5.7.0
+ 5.10.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.2
test
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java
index a63ec497..62d7152a 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java
@@ -7,7 +7,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -29,6 +29,14 @@ public class MessageRouterImpl implements MessageRouter, SmartConnectorEndpoint
*/
private static final int MAX_ENTRIES = 5000;
+ /**
+ * How many seconds should the MessageRouter wait for ANSWER/REACT Message when
+ * sending a ASK/POST Message? 0 means wait forever (useful when working with a
+ * human KB)
+ */
+ private static final String CONF_KEY_WAIT_TIMEOUT = "KE_KB_WAIT_TIMEOUT";
+ private static final int DEFAULT_WAIT_TIMEOUT = 10;
+
private final SmartConnectorImpl smartConnector;
private final Map> openAskMessages = Collections
.synchronizedMap(new LinkedHashMap>() {
@@ -77,6 +85,10 @@ public MessageRouterImpl(SmartConnectorImpl smartConnector) {
this.smartConnector = smartConnector;
}
+ private int getWaitTimeout() {
+ return Integer.parseInt(this.getConfigProperty(CONF_KEY_WAIT_TIMEOUT, Integer.toString(DEFAULT_WAIT_TIMEOUT)));
+ }
+
@Override
public CompletableFuture sendAskMessage(AskMessage askMessage) throws IOException {
MessageDispatcherEndpoint messageDispatcher = this.messageDispatcherEndpoint;
@@ -84,6 +96,17 @@ public CompletableFuture sendAskMessage(AskMessage askMessage) th
throw new IOException("Not connected to MessageDispatcher");
}
CompletableFuture future = new CompletableFuture<>();
+
+ // wait maximally WAIT_TIMEOUT for a return message.
+ int waitInSeconds = this.getWaitTimeout();
+ if (waitInSeconds > 0) {
+ future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS);
+ }
+
+ future.whenComplete((m, e) -> {
+ this.openAskMessages.remove(askMessage.getMessageId());
+ });
+
this.openAskMessages.put(askMessage.getMessageId(), future);
messageDispatcher.send(askMessage);
@@ -99,6 +122,17 @@ public CompletableFuture sendPostMessage(PostMessage postMessage)
throw new IOException("Not connected to MessageDispatcher");
}
CompletableFuture future = new CompletableFuture<>();
+
+ // wait maximally WAIT_TIMEOUT for a return message.
+ int waitInSeconds = this.getWaitTimeout();
+ if (waitInSeconds > 0) {
+ future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS);
+ }
+
+ future.whenComplete((m, e) -> {
+ this.openAskMessages.remove(postMessage.getMessageId());
+ });
+
this.openPostMessages.put(postMessage.getMessageId(), future);
messageDispatcher.send(postMessage);
LOG.debug("Sent PostMessage: {}", postMessage);
@@ -124,7 +158,7 @@ public void handleAskMessage(AskMessage message) {
try {
messageDispatcher.send(reply);
} catch (Throwable e) {
- this.LOG.warn("Could not send reply to message " + message.getMessageId());
+ this.LOG.warn("Could not send reply to message " + message.getMessageId() + ": " + e.getMessage());
this.LOG.debug("", e);
}
}).handle((r, e) -> {
@@ -246,6 +280,21 @@ public URI getKnowledgeBaseId() {
return this.smartConnector.getKnowledgeBaseId();
}
+ public String getConfigProperty(String key, String defaultValue) {
+ // We might replace this with something a bit more fancy in the future...
+ String value = System.getenv(key);
+ if (value == null) {
+ value = defaultValue;
+ LOG.trace("No value for the configuration parameter '{}' was provided, using the default value '{}'", key,
+ defaultValue);
+ }
+ return value;
+ }
+
+ public boolean hasConfigProperty(String key) {
+ return System.getenv(key) != null;
+ }
+
@Override
public void setMessageDispatcher(MessageDispatcherEndpoint messageDispatcherEndpoint) {
assert this.messageDispatcherEndpoint == null;
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java
index 78cd13fc..6f54b517 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java
@@ -340,14 +340,6 @@ public CompletableFuture getOtherKnowledgeBase(URI toKnowled
// condition that otherKnowledgeBase should NEVER be null.
return null;
}
- }).handle((r, e) -> {
-
- if (r == null) {
- LOG.error("An exception has occured while getting Other Knowledge Base", e);
- return null;
- } else {
- return r;
- }
});
return future;
} catch (IOException e) {
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java
index 780d2f9b..08bff0b1 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java
@@ -73,7 +73,7 @@ public CompletableFuture populate() {
}).handle((r, e) -> {
if (r == null && e != null) {
- LOG.error("An exception has occured while adding an other Knowledge Base ", e);
+ LOG.debug("An exception has occured while adding {} an other Knowledge Base ", id, e);
return null;
} else {
return r;
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java
index b8b5f63b..74a5efe7 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java
@@ -479,17 +479,18 @@ private void checkStopped() {
void communicationReady() {
Instant beforePopulate = Instant.now();
LOG.info("Getting comms ready took {} ms", Duration.between(this.started, beforePopulate).toMillis());
- // Populate the initial knowledge base store.
- this.otherKnowledgeBaseStore.populate().thenRun(() -> {
- LOG.info("Populating took {} ms", Duration.between(beforePopulate, Instant.now()).toMillis());
- Instant beforeAnnounce = Instant.now();
- // Then tell the other knowledge bases about our existence.
- this.metaKnowledgeBase.postNewKnowledgeBase().thenRun(() -> {
- LOG.info("Announcing took {} ms", Duration.between(beforeAnnounce, Instant.now()).toMillis());
- Instant beforeConstructorFinished = Instant.now();
- this.constructorFinished.thenRun(() -> {
- LOG.info("Constructor finished took {} ms",
- Duration.between(beforeConstructorFinished, Instant.now()).toMillis());
+ Instant beforeConstructorFinished = Instant.now();
+ this.constructorFinished.handle((r3, e3) -> {
+ LOG.info("Constructor finished took {} ms",
+ Duration.between(beforeConstructorFinished, Instant.now()).toMillis());
+ // Populate the initial knowledge base store.
+ this.otherKnowledgeBaseStore.populate().handle((r, e) -> {
+ LOG.info("Populating took {} ms", Duration.between(beforePopulate, Instant.now()).toMillis());
+ Instant beforeAnnounce = Instant.now();
+ // Then tell the other knowledge bases about our existence.
+ this.metaKnowledgeBase.postNewKnowledgeBase().handle((r2, e2) -> {
+ LOG.info("Announcing took {} ms", Duration.between(beforeAnnounce, Instant.now()).toMillis());
+
// When that is done, and all peers have acknowledged our existence, we
// can proceed to inform the knowledge base that this smart connector is
// ready for action!
@@ -500,11 +501,14 @@ void communicationReady() {
LOG.error("KnowledgeBase threw exception", t);
}
});
+ return (Void) null;
});
+ return (Void) null;
+ }).exceptionally((Throwable t) -> {
+ LOG.error("Populating the Smart Connector should not result in errors.", t);
+ return null;
});
- }).exceptionally((Throwable t) -> {
- LOG.error("Populating the Smart Connector should not result in errors.", t);
- return null;
+ return (Void) null;
});
}
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java
index 9e595e37..8f75efad 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java
@@ -53,9 +53,9 @@ public void smartConnectorAdded(RuntimeSmartConnector smartConnector) {
LocalSmartConnectorConnection connection = new LocalSmartConnectorConnection(messageDispatcher, endpoint);
this.localSmartConnectorConnections.put(endpoint.getKnowledgeBaseId(), connection);
connection.start();
+
if (messageDispatcher.runsInDistributedMode()) {
this.messageDispatcher.getRemoteSmartConnectorConnectionsManager().notifyChangedLocalSmartConnectors();
- this.messageDispatcher.notifySmartConnectorsChanged();
}
}
@@ -68,7 +68,6 @@ public void smartConnectorRemoved(RuntimeSmartConnector smartConnector) {
if (messageDispatcher.runsInDistributedMode()) {
this.messageDispatcher.getRemoteSmartConnectorConnectionsManager().notifyChangedLocalSmartConnectors();
- this.messageDispatcher.notifySmartConnectorsChanged();
}
}
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java
index 734e1835..a0d2e7ab 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java
@@ -8,7 +8,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
-import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
@@ -21,7 +20,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -44,9 +42,11 @@
public class RemoteKerConnection {
/**
- * A maximum amount of time to wait for othe HTTP REST call to fail/succeed.
+ * How many seconds the HttpClient waits for a HTTP response when sending a HTTP
+ * request. Default 5 seconds.
*/
- private static final int HTTP_TIMEOUT = 30;
+ private static final String CONF_KEY_HTTP_TIMEOUT = "KE_HTTP_TIMEOUT";
+ private static final int DEFAULT_HTTP_TIMEOUT = 5;
public static final Logger LOG = LoggerFactory.getLogger(RemoteKerConnection.class);
@@ -60,6 +60,7 @@ public class RemoteKerConnection {
private LocalDateTime tryAgainAfter = null;
private int errorCounter = 0;
+ private LocalDateTime logStillIgnoringAfter = null;
public RemoteKerConnection(MessageDispatcher dispatcher,
KnowledgeEngineRuntimeConnectionDetails kerConnectionDetails) {
@@ -89,13 +90,19 @@ protected PasswordAuthentication getPasswordAuthentication() {
this.remoteKerUri = kerConnectionDetails.getExposedUrl();
}
- this.httpClient = builder.connectTimeout(Duration.ofSeconds(HTTP_TIMEOUT)).build();
+ int httpTimeout = getHttpTimeout();
+
+ this.httpClient = builder.connectTimeout(Duration.ofSeconds(httpTimeout)).build();
objectMapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).findAndRegisterModules()
.setDateFormat(new RFC3339DateFormat());
}
+ private int getHttpTimeout() {
+ return Integer.parseInt(this.getConfigProperty(CONF_KEY_HTTP_TIMEOUT, Integer.toString(DEFAULT_HTTP_TIMEOUT)));
+ }
+
public URI getRemoteKerUri() {
return this.remoteKerUri;
}
@@ -103,6 +110,7 @@ public URI getRemoteKerUri() {
private void noError() {
this.errorCounter = 0;
this.tryAgainAfter = null;
+ this.logStillIgnoringAfter = null;
}
private int errorOccurred() {
@@ -119,7 +127,7 @@ private int errorOccurred() {
private void updateRemoteKerDataFromPeer() {
try {
HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails"))
- .header("Content-Type", "application/json").version(Version.HTTP_1_1).GET().build();
+ .header("Content-Type", "application/json").GET().build();
HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString());
if (response.statusCode() == 200) {
@@ -148,11 +156,12 @@ private void updateRemoteKerDataFromPeer() {
dispatcher.notifySmartConnectorsChanged();
}
- private boolean isAvailable() {
+ public boolean isAvailable() {
if (tryAgainAfter != null) {
boolean after = LocalDateTime.now().isAfter(tryAgainAfter);
if (after) {
LOG.info("KER {} available again.", this.remoteKerUri);
+ this.tryAgainAfter = null;
}
return after;
} else
@@ -226,21 +235,23 @@ public void stop() {
HttpRequest request = HttpRequest
.newBuilder(new URI(this.remoteKerUri + "/runtimedetails/"
+ dispatcher.getKnowledgeDirectoryConnectionManager().getMyKnowledgeDirectoryId()))
- .header("Content-Type", "application/json").version(Version.HTTP_1_1).DELETE().build();
+ .header("Content-Type", "application/json").DELETE().build();
HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString());
if (response.statusCode() == 200) {
LOG.trace("Successfully said goodbye to {}", this.remoteKerUri);
} else {
+ this.remoteKerDetails = null;
LOG.warn("Failed to say goodbye to {}, got response {}: {}", this.remoteKerUri,
response.statusCode(), response.body());
}
} catch (IOException | URISyntaxException | InterruptedException e) {
+ this.remoteKerDetails = null;
LOG.warn("Failed to say goodbye to " + remoteKerConnectionDetails.getId());
LOG.debug("", e);
}
} else
- LOG.info("Still ignoring KER {}.", this.remoteKerUri);
+ logStillIgnoring();
// if someone calls this stop method, all smart connectors should be removed
// from the other knowledge base store. We do this by removing the ker details
@@ -249,6 +260,16 @@ public void stop() {
dispatcher.notifySmartConnectorsChanged();
}
+ /**
+ * To prevent many "Still ignoring" messages, we only log them once a minute.
+ */
+ private void logStillIgnoring() {
+ if (logStillIgnoringAfter == null || logStillIgnoringAfter.isBefore(LocalDateTime.now())) {
+ LOG.warn("Still ignoring KER {}.", this.remoteKerUri);
+ logStillIgnoringAfter = LocalDateTime.now().plusMinutes(1);
+ }
+ }
+
public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOException {
assert (getRemoteKerDetails() == null ? true
: getRemoteKerDetails().getSmartConnectorIds().contains(message.getToKnowledgeBase().toString()));
@@ -259,8 +280,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept
String jsonMessage = objectMapper.writeValueAsString(MessageConverter.toJson(message));
HttpRequest request = HttpRequest
.newBuilder(new URI(this.remoteKerUri + getPathForMessageType(message)))
- .header("Content-Type", "application/json").version(Version.HTTP_1_1)
- .POST(BodyPublishers.ofString(jsonMessage)).build();
+ .header("Content-Type", "application/json").POST(BodyPublishers.ofString(jsonMessage)).build();
HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString());
@@ -268,24 +288,24 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept
this.noError();
LOG.trace("Successfully sent message {} to {}", message.getMessageId(), this.remoteKerUri);
} else {
+ this.remoteKerDetails = null;
int time = this.errorOccurred();
LOG.warn("Ignoring KER {} for {} minutes. Failed to send message {} to {}, got response {}: {}",
this.remoteKerUri, time, message.getMessageId(), this.remoteKerUri, response.statusCode(),
response.body());
+ this.dispatcher.notifySmartConnectorsChanged();
throw new IOException("Message not accepted by remote host, status code " + response.statusCode()
+ ", body " + response.body());
}
- } catch (JsonProcessingException | URISyntaxException | InterruptedException e) {
- int time = this.errorOccurred();
- LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time);
- throw new IOException("Could not send message to remote SmartConnector.", e);
- } catch (IOException e) {
+ } catch (URISyntaxException | InterruptedException | IOException e) {
+ this.remoteKerDetails = null;
int time = this.errorOccurred();
LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time);
- throw e;
+ this.dispatcher.notifySmartConnectorsChanged();
+ throw new IOException(e);
}
} else {
- LOG.warn("Still ignoring KER {}.", this.remoteKerUri);
+ logStillIgnoring();
throw new IOException("KER " + this.remoteKerUri + " is currently unavailable. Trying again later.");
}
}
@@ -295,8 +315,7 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) {
try {
String jsonMessage = objectMapper.writeValueAsString(details);
HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails"))
- .header("Content-Type", "application/json").version(Version.HTTP_1_1)
- .POST(BodyPublishers.ofString(jsonMessage)).build();
+ .header("Content-Type", "application/json").POST(BodyPublishers.ofString(jsonMessage)).build();
HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString());
if (response.statusCode() == 200) {
@@ -304,19 +323,23 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) {
LOG.trace("Successfully sent updated KnowledgeEngineRuntimeDetails to {}", this.remoteKerUri);
} else {
this.remoteKerDetails = null;
- this.errorOccurred();
- LOG.warn("Failed to send updated KnowledgeEngineRuntimeDetails to {}, got response {}: {}",
- this.remoteKerUri, response.statusCode(), response.body());
+ int time = this.errorOccurred();
+ this.dispatcher.notifySmartConnectorsChanged();
+ LOG.warn(
+ "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails, got response {}: {}",
+ this.remoteKerUri, time, response.statusCode(), response.body());
}
} catch (IOException | URISyntaxException | InterruptedException e) {
this.remoteKerDetails = null;
- this.errorOccurred();
- LOG.warn("Failed to send updated KnowledgeEngineRuntimeDetails to "
- + remoteKerConnectionDetails.getId());
+ int time = this.errorOccurred();
+ this.dispatcher.notifySmartConnectorsChanged();
+ LOG.warn(
+ "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails due to '{}'",
+ this.remoteKerUri, time, e.getMessage());
LOG.debug("", e);
}
} else
- LOG.info("Still ignoring KER {}.", this.remoteKerUri);
+ logStillIgnoring();
}
private String getPathForMessageType(KnowledgeMessage message) {
@@ -335,4 +358,19 @@ private String getPathForMessageType(KnowledgeMessage message) {
}
}
+ public String getConfigProperty(String key, String defaultValue) {
+ // We might replace this with something a bit more fancy in the future...
+ String value = System.getenv(key);
+ if (value == null) {
+ value = defaultValue;
+ LOG.trace("No value for the configuration parameter '{}' was provided, using the default value '{}'", key,
+ defaultValue);
+ }
+ return value;
+ }
+
+ public boolean hasConfigProperty(String key) {
+ return System.getenv(key) != null;
+ }
+
}
diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java
index e750927e..94b61f04 100644
--- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java
+++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java
@@ -40,6 +40,7 @@ public class RemoteKerConnectionManager extends SmartConnectorManagementApiServi
private static final int KNOWLEDGE_DIRECTORY_UPDATE_COOLDOWN = 2;
private final RemoteMessageReceiver messageReceiver;
private final Map remoteKerConnections = new ConcurrentHashMap<>();
+ private final Map unavailableRemoteKerConnections = new ConcurrentHashMap<>();
private ScheduledFuture> scheduledScheduleFuture;
private ScheduledFuture> scheduledKnowledgeDirectoryQueryFuture;
private final MessageDispatcher messageDispatcher;
@@ -78,7 +79,7 @@ public void scheduleQueryKnowledgeDirectory() {
LOG.debug("Scheduling to query the Knowledge Directory right away.");
this.scheduledKnowledgeDirectoryQueryFuture = KeRuntime.executorService().schedule(() -> {
try {
- queryKnowledgeDirectory();
+ queryKnowledgeDirectory();
} catch (Throwable t) {
LOG.error("", t);
}
@@ -123,11 +124,26 @@ private synchronized void queryKnowledgeDirectory() {
.hasNext();) {
Entry e = it.next();
+ // deal with unavailable remote kers
+ if (e.getValue().isAvailable() && this.unavailableRemoteKerConnections.containsKey(e.getKey())) {
+ // available again so make sure we get its current SCs
+ this.unavailableRemoteKerConnections.remove(e.getKey());
+ e.getValue().getRemoteKerDetails();
+ }
+
+ if (!e.getValue().isAvailable()) {
+ if (!this.unavailableRemoteKerConnections.containsKey(e.getKey())) {
+ // recently became unavailable
+ this.unavailableRemoteKerConnections.put(e.getKey(), e.getValue());
+ }
+ }
+
if (!kerIds.contains(e.getKey())) {
// According the the Knowledge Directory, this KER doesn't exist (anymore)
LOG.info("Removing peer that is now gone: {}", e.getValue().getRemoteKerUri());
e.getValue().stop();
it.remove();
+ this.unavailableRemoteKerConnections.remove(e.getKey());
}
}
this.knowledgeDirectoryUpdateCooldownEnds = new Date(
diff --git a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java
index 6b098930..974486c2 100644
--- a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java
+++ b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java
@@ -1,5 +1,8 @@
package eu.knowledge.engine.smartconnector.api;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.util.Calendar;
import java.util.Iterator;
import java.util.UUID;
@@ -252,7 +255,11 @@ public void test() throws InterruptedException, ExecutionException {
AskResult ar = app.ask(aAskKI, new BindingSet()).get();
- LOG.info("Bindings: {}", ar.getBindings());
+ BindingSet bindings = ar.getBindings();
+ LOG.info("Bindings: {}", bindings);
+
+ assertTrue(bindings.size() > 0);
+ assertEquals("\"Meeting 2\"", bindings.iterator().next().get("topic"));
kn.stop().get();