Skip to content

Commit

Permalink
ConditionalRemover interface for External Infinispan feature
Browse files Browse the repository at this point in the history
Add a ConditionalRemover interface to remove entries from a RemoteCache
based on the key or value fields.
The default implementation provided by this PR uses streaming/iteration
to test and remove entries

On a side change, moved all the transactions to the same package and
created one transaction class per entity/cache to simplify code and
avoid writing "RemoteChangeLogTransaction" with a long list of types.

Fixes keycloak#31046

Signed-off-by: Pedro Ruivo <[email protected]>
  • Loading branch information
pruivo authored and ahus1 committed Jul 30, 2024
1 parent 814e958 commit e62604b
Show file tree
Hide file tree
Showing 25 changed files with 637 additions and 237 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.models.sessions.infinispan.changes.remote.remover;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;

/**
* It handles conditional remove operations.
* <p>
* This class is preferred to remove an unknown amount of entries by its key and/or value state. The implement may use
* queries (delete statements) or perform a full cache scan to find the entries to remove.
* <p>
* The method {@link #willRemove(Updater)} is invoked by {@link RemoteChangeLogTransaction} before perform any change
* tracked by the {@link Updater}. This is an optimization to prevent sending changes that would be removed by this
* {@link ConditionalRemover}.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public interface ConditionalRemover<K, V> {

/**
* @param key The entry's key to test.
* @param value The entry's value to test.
* @return {@code true} if the entry will be removed from the {@link RemoteCache}.
*/
boolean willRemove(K key, V value);

/**
* @param updater The {@link Updater} to test.
* @return {@code true} if the entry tracked by the {@link Updater} will be removed from the {@link RemoteCache}.
*/
default boolean willRemove(Updater<K, V> updater) {
// The value can be null if the entry updated is marked as deleted.
// In that case, we don't have the value to check for the condition and will let the transaction perform the removal.
return updater.getValue() != null && willRemove(updater.getKey(), updater.getValue());
}

/**
* Executes the conditional removes in the {@link RemoteCache}.
*
* @param cache The {@link RemoteCache} to perform the remove operations.
* @param stage The {@link AggregateCompletionStage} to add any incomplete tasks.
*/
void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.models.sessions.infinispan.changes.remote.remover;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;

/**
* A {@link ConditionalRemover} that does not remove anything.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public class EmptyConditionalRemover<K, V> implements ConditionalRemover<K, V> {

private static final EmptyConditionalRemover<?, ?> INSTANCE = new EmptyConditionalRemover<>();

@SuppressWarnings("unchecked")
public static <K1, V1> ConditionalRemover<K1, V1> instance() {
return (ConditionalRemover<K1, V1>) INSTANCE;
}


@Override
public boolean willRemove(K key, V value) {
return false;
}

@Override
public void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage) {
//no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;

import java.util.ArrayList;
import java.util.List;

import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;

/**
* A {@link ConditionalRemover} implementation to remove {@link SessionEntity} from a {@link RemoteCache} based on
* {@link SessionEntity#getRealmId()} value.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public class ByRealmIdConditionalRemover<K, V extends SessionEntity> extends IterationBasedConditionalRemover<K, V> {

private final List<String> realms;

public ByRealmIdConditionalRemover() {
realms = new ArrayList<>();
}

public void removeByRealmId(String realmId) {
realms.add(realmId);
}

@Override
boolean isEmpty() {
return realms.isEmpty();
}

@Override
public boolean willRemove(K key, V value) {
return realms.contains(value.getRealmId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;

import java.util.Map;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Predicate;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;

/**
* An iteration based implementation of {@link ConditionalRemover}.
* <p>
* This class is not performance efficient since it has to download the full {@link RemoteCache} content to perform the
* removal tests.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
abstract class IterationBasedConditionalRemover<K, V> implements ConditionalRemover<K, V>, Predicate<Map.Entry<K, MetadataValue<V>>> {

@Override
public final void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage) {
if (isEmpty()) {
return;
}
var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
.filter(this)
.map(Map.Entry::getKey)
.flatMapCompletable(key -> Completable.fromCompletionStage(cache.removeAsync(key)))
.toCompletionStage(null);
stage.dependsOn(rmStage);
}

@Override
public final boolean test(Map.Entry<K, MetadataValue<V>> entry) throws Throwable {
return willRemove(entry.getKey(), entry.getValue().getValue());
}

/**
* @return {@code true} if this implementation won't remove anything. It avoids iterating over the
* {@link RemoteCache} contents.
*/
abstract boolean isEmpty();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.keycloak.models.sessions.infinispan.changes.remote.updater;

import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;

import java.util.function.BiFunction;

import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;

/**
* An interface used by {@link RemoteChangeLogTransaction}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import org.keycloak.models.ClientModel;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.helper.MapUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;

/**
Expand All @@ -53,7 +53,7 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater<UUID, Authent
private final boolean offline;
private UserSessionModel userSession;
private ClientModel client;
private RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> clientTransaction;
private ClientSessionChangeLogTransaction clientTransaction;

private AuthenticatedClientSessionUpdater(UUID cacheKey, AuthenticatedClientSessionEntity cacheValue, long version, boolean offline, UpdaterState initialState) {
super(cacheKey, cacheValue, version, initialState);
Expand Down Expand Up @@ -200,10 +200,10 @@ protected boolean isUnchanged() {
*
* @param userSession The {@link UserSessionModel} associated with this client session.
* @param client The {@link ClientModel} associated with this client session.
* @param clientTransaction The {@link RemoteChangeLogTransaction} to perform the changes in this class into the
* @param clientTransaction The {@link ClientSessionChangeLogTransaction} to perform the changes in this class into the
* {@link RemoteCache}.
*/
public synchronized void initialize(UserSessionModel userSession, ClientModel client, RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> clientTransaction) {
public synchronized void initialize(UserSessionModel userSession, ClientModel client, ClientSessionChangeLogTransaction clientTransaction) {
this.userSession = Objects.requireNonNull(userSession);
this.client = Objects.requireNonNull(client);
this.clientTransaction = Objects.requireNonNull(clientTransaction);
Expand Down
Loading

0 comments on commit e62604b

Please sign in to comment.