Skip to content

Commit

Permalink
Improved the performance of the announcing of SCs.
Browse files Browse the repository at this point in the history
With many SCs they started to take a long time and started to exceed
some thresholds.
  • Loading branch information
bnouwt committed Sep 27, 2024
1 parent 755ffa8 commit ec82eab
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ public AskPlan planAskFromKnowledgeBase(MyKnowledgeInteractionInfo anAKI, Recipi
for (OtherKnowledgeBase otherKB : filteredOtherKnowledgeBases) {
// Use the knowledge interactions from the other KB
var knowledgeInteractions = otherKB.getKnowledgeInteractions().stream().filter((r) -> {
// But filter on the communicative act. These have to match!
return communicativeActMatcher(anAKI, r);
}).filter((r) -> {
return anAKI.getKnowledgeInteraction().includeMetaKIs() ? true : !r.isMeta();
});

otherKnowledgeInteractions.addAll(knowledgeInteractions.collect(Collectors.toList()));
}

// But filter on the communicative act. These have to match!
filterWithCommunicativeActMatcher(anAKI, otherKnowledgeInteractions);

// create a new SingleInteractionProcessor to handle this ask.
SingleInteractionProcessor processor;
if (this.reasonerEnabled) {
Expand Down Expand Up @@ -289,14 +290,14 @@ public PostPlan planPostFromKnowledgeBase(MyKnowledgeInteractionInfo aPKI, Recip
for (OtherKnowledgeBase otherKB : filteredOtherKnowledgeBases) {
// Use the knowledge interactions from the other KB
var knowledgeInteractions = otherKB.getKnowledgeInteractions().stream().filter((r) -> {
// But filter on the communicative act. These have to match!
return communicativeActMatcher(aPKI, r);
}).filter((r) -> {
return aPKI.getKnowledgeInteraction().includeMetaKIs() ? true : !r.isMeta();
});
otherKnowledgeInteractions.addAll(knowledgeInteractions.collect(Collectors.toList()));
}

// But filter on the communicative act. These have to match!
filterWithCommunicativeActMatcher(aPKI, otherKnowledgeInteractions);

// create a new SingleInteractionProcessor to handle this ask.
SingleInteractionProcessor processor;
if (this.reasonerEnabled) {
Expand Down Expand Up @@ -412,7 +413,8 @@ public void unsetMessageRouter() {
* @return {@code true} if the communicative acts of the given
* KnowledgeInteractions match, {@code false} otherwise.
*/
private boolean communicativeActMatcher(MyKnowledgeInteractionInfo myKI, KnowledgeInteractionInfo otherKI) {
private void filterWithCommunicativeActMatcher(MyKnowledgeInteractionInfo myKI,
Set<KnowledgeInteractionInfo> otherKIs) {

Instant start = Instant.now();

Expand Down Expand Up @@ -441,23 +443,25 @@ private boolean communicativeActMatcher(MyKnowledgeInteractionInfo myKI, Knowled
}

// then add the other knowledge interaction communicative act
CommunicativeAct otherAct = otherKI.getKnowledgeInteraction().getAct();
Resource otherActResource = ResourceFactory.createResource(otherKI.id + "/act");
for (KnowledgeInteractionInfo otherKI : otherKIs) {
CommunicativeAct otherAct = otherKI.getKnowledgeInteraction().getAct();
Resource otherActResource = ResourceFactory.createResource(otherKI.id + "/act");

m.add(otherActResource, RDF.type, Vocab.COMMUNICATIVE_ACT);
m.add(otherActResource, RDF.type, Vocab.COMMUNICATIVE_ACT);

Resource otherRequirementPurpose = ResourceFactory.createResource(otherActResource + "/requirement");
Resource otherSatisfactionPurpose = ResourceFactory.createResource(otherActResource + "/satisfaction");
Resource otherRequirementPurpose = ResourceFactory.createResource(otherActResource + "/requirement");
Resource otherSatisfactionPurpose = ResourceFactory.createResource(otherActResource + "/satisfaction");

m.add(otherActResource, Vocab.HAS_REQ, otherRequirementPurpose);
m.add(otherSatisfactionPurpose, Vocab.HAS_SAT, otherSatisfactionPurpose);
m.add(otherActResource, Vocab.HAS_REQ, otherRequirementPurpose);
m.add(otherSatisfactionPurpose, Vocab.HAS_SAT, otherSatisfactionPurpose);

// give the purposes the correct types
for (Resource r : otherAct.getRequirementPurposes()) {
m.add(otherRequirementPurpose, RDF.type, r);
}
for (Resource r : otherAct.getSatisfactionPurposes()) {
m.add(otherSatisfactionPurpose, RDF.type, r);
// give the purposes the correct types
for (Resource r : otherAct.getRequirementPurposes()) {
m.add(otherRequirementPurpose, RDF.type, r);
}
for (Resource r : otherAct.getSatisfactionPurposes()) {
m.add(otherSatisfactionPurpose, RDF.type, r);
}
}

// then apply the reasoner
Expand All @@ -468,32 +472,42 @@ private boolean communicativeActMatcher(MyKnowledgeInteractionInfo myKI, Knowled
// faster. either we set multiple iris for the same params. Or we change the ASK
// to include myReq/otherReq and mySat/otherSat vars.

// my perspective
Var reqVar = Var.alloc("req");
Var satVar = Var.alloc("sat");
org.apache.jena.sparql.engine.binding.Binding theFirstBinding = BindingFactory.binding(reqVar,
NodeFactory.createURI(myRequirementPurpose.toString()), satVar,
NodeFactory.createURI(otherSatisfactionPurpose.toString()));
// my and other perspective
var iter = otherKIs.iterator();
while (iter.hasNext()) {
KnowledgeInteractionInfo otherKI = iter.next();
Resource otherActResource = ResourceFactory.createResource(otherKI.id + "/act");
Resource otherRequirementPurpose = ResourceFactory.createResource(otherActResource + "/requirement");
Resource otherSatisfactionPurpose = ResourceFactory.createResource(otherActResource + "/satisfaction");

org.apache.jena.sparql.engine.binding.Binding theSecondBinding = BindingFactory.binding(reqVar,
NodeFactory.createURI(otherRequirementPurpose.toString()), satVar,
NodeFactory.createURI(mySatisfactionPurpose.toString()));
Var reqVar = Var.alloc("req");
Var satVar = Var.alloc("sat");
org.apache.jena.sparql.engine.binding.Binding theFirstBinding = BindingFactory.binding(reqVar,
NodeFactory.createURI(myRequirementPurpose.toString()), satVar,
NodeFactory.createURI(otherSatisfactionPurpose.toString()));

Query q = (Query) query.clone();
ElementData de = ((ElementData) ((ElementGroup) q.getQueryPattern()).getLast());
org.apache.jena.sparql.engine.binding.Binding theSecondBinding = BindingFactory.binding(reqVar,
NodeFactory.createURI(otherRequirementPurpose.toString()), satVar,
NodeFactory.createURI(mySatisfactionPurpose.toString()));

List<org.apache.jena.sparql.engine.binding.Binding> data = de.getRows();
data.add(theFirstBinding);
data.add(theSecondBinding);
Query q = (Query) query.clone();
ElementData de = ((ElementData) ((ElementGroup) q.getQueryPattern()).getLast());

QueryExecution myQe = QueryExecutionFactory.create(q, infModel);
boolean execAskMy = myQe.execAsk();
myQe.close();
List<org.apache.jena.sparql.engine.binding.Binding> data = de.getRows();
data.add(theFirstBinding);
data.add(theSecondBinding);

doTheyMatch = !execAskMy;
LOG.trace("Communicative Act time ({}): {}ms", doTheyMatch, Duration.between(start, Instant.now()).toMillis());
QueryExecution myQe = QueryExecutionFactory.create(q, infModel);
boolean execAskMy = myQe.execAsk();
myQe.close();

doTheyMatch = !execAskMy;

return doTheyMatch;
if (!doTheyMatch) {
iter.remove();
}
}
LOG.trace("Communicative Act time ({}): {}ms", doTheyMatch, Duration.between(start, Instant.now()).toMillis());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.Phaser;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand All @@ -14,10 +15,11 @@

public class TestRegisterSmartConnectorWithSameId {
private static final Logger LOG = LoggerFactory.getLogger(TestRegisterSmartConnectorWithSameId.class);

private Phaser readyPhaser = new Phaser(1);
@Test
public void testRegisterSmartConnectorWithSameIdInSameRuntimeThrows() {
var kb1 = new MockedKnowledgeBase("http://example.org/kb1");
kb1.setPhaser(this.readyPhaser);
kb1.start();

var kb1AsWell = new MockedKnowledgeBase("http://example.org/kb1");
Expand Down

0 comments on commit ec82eab

Please sign in to comment.