Skip to content

Commit

Permalink
synchronize on lock while building/pruning owner chains to avoid dupl…
Browse files Browse the repository at this point in the history
…icate entity creation in different transactions
  • Loading branch information
andrewazores committed Oct 9, 2024
1 parent 7cd8f49 commit a0dd067
Showing 1 changed file with 78 additions and 74 deletions.
152 changes: 78 additions & 74 deletions src/main/java/io/cryostat/discovery/KubeApiDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,88 +274,92 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException
@ConsumeEvent(blocking = true, ordered = true)
@Transactional(TxType.REQUIRES_NEW)
public void handleEndpointEvent(EndpointDiscoveryEvent evt) {
String namespace = evt.namespace;
DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow();
DiscoveryNode nsNode =
DiscoveryNode.getChild(realm, n -> n.name.equals(namespace))
.orElse(
DiscoveryNode.environment(
namespace, KubeDiscoveryNodeType.NAMESPACE));
synchronized (txLock) {
String namespace = evt.namespace;
DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow();
DiscoveryNode nsNode =
DiscoveryNode.getChild(realm, n -> n.name.equals(namespace))
.orElse(
DiscoveryNode.environment(
namespace, KubeDiscoveryNodeType.NAMESPACE));

try {
if (evt.eventKind == EventKind.FOUND) {
buildOwnerChain(nsNode, evt.target, evt.objRef);
} else {
pruneOwnerChain(nsNode, evt.target);
}
try {
if (evt.eventKind == EventKind.FOUND) {
buildOwnerChain(nsNode, evt.target, evt.objRef);
} else {
pruneOwnerChain(nsNode, evt.target);
}

if (!nsNode.hasChildren()) {
realm.children.remove(nsNode);
nsNode.parent = null;
} else if (!realm.children.contains(nsNode)) {
realm.children.add(nsNode);
nsNode.parent = realm;
if (!nsNode.hasChildren()) {
realm.children.remove(nsNode);
nsNode.parent = null;
} else if (!realm.children.contains(nsNode)) {
realm.children.add(nsNode);
nsNode.parent = realm;
}
realm.persist();
} catch (Exception e) {
logger.warn("Endpoint handler exception", e);
}
realm.persist();
} catch (Exception e) {
logger.warn("Endpoint handler exception", e);
}
}

private void handleObservedEndpoints(String namespace) {
List<DiscoveryNode> targetNodes =
DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream()
.filter(
(n) ->
namespace.equals(
n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY)))
.collect(Collectors.toList());

Map<URI, ObjectReference> targetRefMap = new HashMap<>();

Set<Target> persistedTargets = new HashSet<>();
for (DiscoveryNode node : targetNodes) {
persistedTargets.add(node.target);
}

Set<Target> observedTargets =
safeGetInformers().get(namespace).getStore().list().stream()
.map((endpoint) -> getTargetTuplesFrom(endpoint))
.flatMap(List::stream)
.filter((tuple) -> Objects.nonNull(tuple.objRef))
.map(
(tuple) -> {
Target t = tuple.toTarget();
if (t != null) {
targetRefMap.put(t.connectUrl, tuple.objRef);
}
return t;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
synchronized (txLock) {
List<DiscoveryNode> targetNodes =
DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream()
.filter(
(n) ->
namespace.equals(
n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY)))
.collect(Collectors.toList());

Map<URI, ObjectReference> targetRefMap = new HashMap<>();

Set<Target> persistedTargets = new HashSet<>();
for (DiscoveryNode node : targetNodes) {
persistedTargets.add(node.target);
}

// Add new targets
Target.compare(persistedTargets)
.to(observedTargets)
.added()
.forEach(
(t) ->
notify(
EndpointDiscoveryEvent.from(
namespace,
t,
targetRefMap.get(t.connectUrl),
EventKind.FOUND)));

// Prune deleted targets
Target.compare(persistedTargets)
.to(observedTargets)
.removed()
.forEach(
(t) ->
notify(
EndpointDiscoveryEvent.from(
namespace, t, null, EventKind.LOST)));
Set<Target> observedTargets =
safeGetInformers().get(namespace).getStore().list().stream()
.map((endpoint) -> getTargetTuplesFrom(endpoint))
.flatMap(List::stream)
.filter((tuple) -> Objects.nonNull(tuple.objRef))
.map(
(tuple) -> {
Target t = tuple.toTarget();
if (t != null) {
targetRefMap.put(t.connectUrl, tuple.objRef);
}
return t;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// Add new targets
Target.compare(persistedTargets)
.to(observedTargets)
.added()
.forEach(
(t) ->
notify(
EndpointDiscoveryEvent.from(
namespace,
t,
targetRefMap.get(t.connectUrl),
EventKind.FOUND)));

// Prune deleted targets
Target.compare(persistedTargets)
.to(observedTargets)
.removed()
.forEach(
(t) ->
notify(
EndpointDiscoveryEvent.from(
namespace, t, null, EventKind.LOST)));
}
}

private void notify(EndpointDiscoveryEvent evt) {
Expand Down

0 comments on commit a0dd067

Please sign in to comment.