From ec82eab817529ad8603f39b7be995d6f47999b07 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 27 Sep 2024 11:46:51 +0200 Subject: [PATCH] Improved the performance of the announcing of SCs. With many SCs they started to take a long time and started to exceed some thresholds. --- .../impl/InteractionProcessorImpl.java | 94 +++++++++++-------- .../TestRegisterSmartConnectorWithSameId.java | 4 +- 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/InteractionProcessorImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/InteractionProcessorImpl.java index 3a9e7d04..f72fb426 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/InteractionProcessorImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/InteractionProcessorImpl.java @@ -119,14 +119,15 @@ public AskPlan planAskFromKnowledgeBase(MyKnowledgeInteractionInfo anAKI, Recipi for (OtherKnowledgeBase otherKB : filteredOtherKnowledgeBases) { // Use the knowledge interactions from the other KB var knowledgeInteractions = otherKB.getKnowledgeInteractions().stream().filter((r) -> { - // But filter on the communicative act. These have to match! - return communicativeActMatcher(anAKI, r); - }).filter((r) -> { return anAKI.getKnowledgeInteraction().includeMetaKIs() ? true : !r.isMeta(); }); + otherKnowledgeInteractions.addAll(knowledgeInteractions.collect(Collectors.toList())); } + // But filter on the communicative act. These have to match! + filterWithCommunicativeActMatcher(anAKI, otherKnowledgeInteractions); + // create a new SingleInteractionProcessor to handle this ask. SingleInteractionProcessor processor; if (this.reasonerEnabled) { @@ -289,14 +290,14 @@ public PostPlan planPostFromKnowledgeBase(MyKnowledgeInteractionInfo aPKI, Recip for (OtherKnowledgeBase otherKB : filteredOtherKnowledgeBases) { // Use the knowledge interactions from the other KB var knowledgeInteractions = otherKB.getKnowledgeInteractions().stream().filter((r) -> { - // But filter on the communicative act. These have to match! - return communicativeActMatcher(aPKI, r); - }).filter((r) -> { return aPKI.getKnowledgeInteraction().includeMetaKIs() ? true : !r.isMeta(); }); otherKnowledgeInteractions.addAll(knowledgeInteractions.collect(Collectors.toList())); } + // But filter on the communicative act. These have to match! + filterWithCommunicativeActMatcher(aPKI, otherKnowledgeInteractions); + // create a new SingleInteractionProcessor to handle this ask. SingleInteractionProcessor processor; if (this.reasonerEnabled) { @@ -412,7 +413,8 @@ public void unsetMessageRouter() { * @return {@code true} if the communicative acts of the given * KnowledgeInteractions match, {@code false} otherwise. */ - private boolean communicativeActMatcher(MyKnowledgeInteractionInfo myKI, KnowledgeInteractionInfo otherKI) { + private void filterWithCommunicativeActMatcher(MyKnowledgeInteractionInfo myKI, + Set otherKIs) { Instant start = Instant.now(); @@ -441,23 +443,25 @@ private boolean communicativeActMatcher(MyKnowledgeInteractionInfo myKI, Knowled } // then add the other knowledge interaction communicative act - CommunicativeAct otherAct = otherKI.getKnowledgeInteraction().getAct(); - Resource otherActResource = ResourceFactory.createResource(otherKI.id + "/act"); + for (KnowledgeInteractionInfo otherKI : otherKIs) { + CommunicativeAct otherAct = otherKI.getKnowledgeInteraction().getAct(); + Resource otherActResource = ResourceFactory.createResource(otherKI.id + "/act"); - m.add(otherActResource, RDF.type, Vocab.COMMUNICATIVE_ACT); + m.add(otherActResource, RDF.type, Vocab.COMMUNICATIVE_ACT); - Resource otherRequirementPurpose = ResourceFactory.createResource(otherActResource + "/requirement"); - Resource otherSatisfactionPurpose = ResourceFactory.createResource(otherActResource + "/satisfaction"); + Resource otherRequirementPurpose = ResourceFactory.createResource(otherActResource + "/requirement"); + Resource otherSatisfactionPurpose = ResourceFactory.createResource(otherActResource + "/satisfaction"); - m.add(otherActResource, Vocab.HAS_REQ, otherRequirementPurpose); - m.add(otherSatisfactionPurpose, Vocab.HAS_SAT, otherSatisfactionPurpose); + m.add(otherActResource, Vocab.HAS_REQ, otherRequirementPurpose); + m.add(otherSatisfactionPurpose, Vocab.HAS_SAT, otherSatisfactionPurpose); - // give the purposes the correct types - for (Resource r : otherAct.getRequirementPurposes()) { - m.add(otherRequirementPurpose, RDF.type, r); - } - for (Resource r : otherAct.getSatisfactionPurposes()) { - m.add(otherSatisfactionPurpose, RDF.type, r); + // give the purposes the correct types + for (Resource r : otherAct.getRequirementPurposes()) { + m.add(otherRequirementPurpose, RDF.type, r); + } + for (Resource r : otherAct.getSatisfactionPurposes()) { + m.add(otherSatisfactionPurpose, RDF.type, r); + } } // then apply the reasoner @@ -468,32 +472,42 @@ private boolean communicativeActMatcher(MyKnowledgeInteractionInfo myKI, Knowled // faster. either we set multiple iris for the same params. Or we change the ASK // to include myReq/otherReq and mySat/otherSat vars. - // my perspective - Var reqVar = Var.alloc("req"); - Var satVar = Var.alloc("sat"); - org.apache.jena.sparql.engine.binding.Binding theFirstBinding = BindingFactory.binding(reqVar, - NodeFactory.createURI(myRequirementPurpose.toString()), satVar, - NodeFactory.createURI(otherSatisfactionPurpose.toString())); + // my and other perspective + var iter = otherKIs.iterator(); + while (iter.hasNext()) { + KnowledgeInteractionInfo otherKI = iter.next(); + Resource otherActResource = ResourceFactory.createResource(otherKI.id + "/act"); + Resource otherRequirementPurpose = ResourceFactory.createResource(otherActResource + "/requirement"); + Resource otherSatisfactionPurpose = ResourceFactory.createResource(otherActResource + "/satisfaction"); - org.apache.jena.sparql.engine.binding.Binding theSecondBinding = BindingFactory.binding(reqVar, - NodeFactory.createURI(otherRequirementPurpose.toString()), satVar, - NodeFactory.createURI(mySatisfactionPurpose.toString())); + Var reqVar = Var.alloc("req"); + Var satVar = Var.alloc("sat"); + org.apache.jena.sparql.engine.binding.Binding theFirstBinding = BindingFactory.binding(reqVar, + NodeFactory.createURI(myRequirementPurpose.toString()), satVar, + NodeFactory.createURI(otherSatisfactionPurpose.toString())); - Query q = (Query) query.clone(); - ElementData de = ((ElementData) ((ElementGroup) q.getQueryPattern()).getLast()); + org.apache.jena.sparql.engine.binding.Binding theSecondBinding = BindingFactory.binding(reqVar, + NodeFactory.createURI(otherRequirementPurpose.toString()), satVar, + NodeFactory.createURI(mySatisfactionPurpose.toString())); - List data = de.getRows(); - data.add(theFirstBinding); - data.add(theSecondBinding); + Query q = (Query) query.clone(); + ElementData de = ((ElementData) ((ElementGroup) q.getQueryPattern()).getLast()); - QueryExecution myQe = QueryExecutionFactory.create(q, infModel); - boolean execAskMy = myQe.execAsk(); - myQe.close(); + List data = de.getRows(); + data.add(theFirstBinding); + data.add(theSecondBinding); - doTheyMatch = !execAskMy; - LOG.trace("Communicative Act time ({}): {}ms", doTheyMatch, Duration.between(start, Instant.now()).toMillis()); + QueryExecution myQe = QueryExecutionFactory.create(q, infModel); + boolean execAskMy = myQe.execAsk(); + myQe.close(); + + doTheyMatch = !execAskMy; - return doTheyMatch; + if (!doTheyMatch) { + iter.remove(); + } + } + LOG.trace("Communicative Act time ({}): {}ms", doTheyMatch, Duration.between(start, Instant.now()).toMillis()); } @Override diff --git a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/runtime/messaging/TestRegisterSmartConnectorWithSameId.java b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/runtime/messaging/TestRegisterSmartConnectorWithSameId.java index 5bac8cbb..3d5f336a 100644 --- a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/runtime/messaging/TestRegisterSmartConnectorWithSameId.java +++ b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/runtime/messaging/TestRegisterSmartConnectorWithSameId.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; +import java.util.concurrent.Phaser; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -14,10 +15,11 @@ public class TestRegisterSmartConnectorWithSameId { private static final Logger LOG = LoggerFactory.getLogger(TestRegisterSmartConnectorWithSameId.class); - + private Phaser readyPhaser = new Phaser(1); @Test public void testRegisterSmartConnectorWithSameIdInSameRuntimeThrows() { var kb1 = new MockedKnowledgeBase("http://example.org/kb1"); + kb1.setPhaser(this.readyPhaser); kb1.start(); var kb1AsWell = new MockedKnowledgeBase("http://example.org/kb1");