diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index c4f59fa..f224e78 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -123,6 +123,7 @@ quarkus.solace.vpn=default quarkus.solace.authentication.scheme=AUTHENTICATION_SCHEME_OAUTH2 quarkus.solace.oidc.client-name=solace // client name provided in oidc client config below quarkus.solace.oidc.refresh.interval=50s // Refresh interval should be less than access token expiry time. Otherwise extension will fail to update access token in solace session. +quarkus.solace.oidc.refresh.timeout=10s // Token Refresh API timeout. Default is set to 10 seconds. quarkus.oidc-client.solace.auth-server-url=http://localhost:7777/auth/realms/master quarkus.oidc-client.solace.client-id= @@ -142,6 +143,7 @@ quarkus.solace.tls.trust-store-type= quarkus.solace.tls.trust-store-password= quarkus.solace.oidc.client-name=solace // client name provided in oidc client config below quarkus.solace.oidc.refresh.interval=50s // Refresh interval should be less than access token expiry time. Otherwise extension will fail to update access token in solace session. +quarkus.solace.oidc.refresh.timeout=10s // Token Refresh API timeout. Default is set to 10 seconds. quarkus.oidc-client.solace.auth-server-url=http://localhost:7777/auth/realms/master quarkus.oidc-client.solace.client-id= diff --git a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java index 67fd962..2bd369d 100644 --- a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java +++ b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java @@ -25,6 +25,9 @@ public class OidcProvider { @ConfigProperty(name = "quarkus.solace.oidc.refresh.interval", defaultValue = "60s") Duration duration; + @ConfigProperty(name = "quarkus.solace.oidc.refresh.timeout", defaultValue = "10s") + Duration refreshTimeout; + @ConfigProperty(name = "quarkus.solace.oidc.client-name") Optional oidcClientName; @@ -43,17 +46,17 @@ Tokens getToken() { void init(MessagingService service) { OidcClient client = getClient(); Multi.createFrom().ticks().every(duration) + .onOverflow().drop() .emitOn(Infrastructure.getDefaultWorkerPool()) - .filter(x -> lastToken == null - || lastToken.getRefreshTokenTimeSkew() == null - || lastToken.isAccessTokenWithinRefreshInterval()) .call(() -> { - if (lastToken != null && lastToken.getRefreshToken() != null) { + if (lastToken != null && lastToken.getRefreshToken() != null + && lastToken.isAccessTokenWithinRefreshInterval()) { Log.info("Refreshing access token for Solace connection"); - return client.refreshTokens(lastToken.getRefreshToken()).invoke(tokens -> lastToken = tokens); + return client.refreshTokens(lastToken.getRefreshToken()).invoke(tokens -> lastToken = tokens).ifNoItem() + .after(refreshTimeout).fail(); } else { Log.info("Acquiring access token for Solace connection"); - return client.getTokens().invoke(tokens -> lastToken = tokens); + return client.getTokens().invoke(tokens -> lastToken = tokens).ifNoItem().after(refreshTimeout).fail(); } }) .onFailure().call(t -> { @@ -64,6 +67,7 @@ void init(MessagingService service) { .subscribe().with(x -> { if (service.isConnected()) { service.updateProperty(SCHEME_OAUTH2_ACCESS_TOKEN, lastToken.getAccessToken()); + Log.info("Updated Solace Session with latest access token"); } else { Log.info("Solace service is not connected, cannot update access token without valid connection"); } @@ -75,8 +79,4 @@ OidcClient getClient() { .orElseGet(clients::getClient); } - public Tokens getLastToken() { - return lastToken; - } - -} \ No newline at end of file +} diff --git a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java index f4bc041..920df4d 100644 --- a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java +++ b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java @@ -1,5 +1,7 @@ package com.solace.quarkus.runtime; +import static com.solace.messaging.config.SolaceProperties.AuthenticationProperties.SCHEME_OAUTH2_ACCESS_TOKEN; + import java.util.Map; import java.util.Properties; import java.util.function.Function; @@ -14,6 +16,7 @@ import com.solace.quarkus.MessagingServiceClientCustomizer; import io.quarkus.arc.SyntheticCreationalContext; +import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; @@ -71,6 +74,14 @@ public MessagingService apply(SyntheticCreationalContext conte } }); + // Update access token on reconnect to make sure invalid token is not sent. This can happen when a reconnection happens event before scheduled token expiry. + service.addReconnectionAttemptListener(serviceEvent -> { + Log.info("Reconnecting to Solace broker due to " + serviceEvent.getMessage()); + if (oidcProvider != null && authScheme != null && "AUTHENTICATION_SCHEME_OAUTH2".equals(authScheme)) { + service.updateProperty(SCHEME_OAUTH2_ACCESS_TOKEN, oidcProvider.getToken().getAccessToken()); + } + }); + return service.connect(); } }; diff --git a/quarkus-solace-client/runtime/src/main/resources/META-INF/beans.xml b/quarkus-solace-client/runtime/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000..e69de29