Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auth Manager API part 3: OAuth2 Manager #11844

Merged
merged 7 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.util.PropertyUtil;
import org.immutables.value.Value;

/**
* The purpose of this class is to hold configuration options for {@link
* org.apache.iceberg.rest.auth.OAuth2Util.AuthSession}.
* The purpose of this class is to hold OAuth configuration options for {@link
danielcweeks marked this conversation as resolved.
Show resolved Hide resolved
* OAuth2Util.AuthSession}.
*/
@Value.Style(redactedMask = "****")
@SuppressWarnings("ImmutablesStyle")
@Value.Immutable
@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
public interface AuthConfig {
@Nullable
@Value.Redacted
Expand All @@ -47,7 +48,7 @@ default String scope() {
return OAuth2Properties.CATALOG_SCOPE;
}

@Value.Lazy
@Value.Default
@Nullable
default Long expiresAtMillis() {
return OAuth2Util.expiresAtMillis(token());
Expand All @@ -69,4 +70,42 @@ default String oauth2ServerUri() {
static ImmutableAuthConfig.Builder builder() {
return ImmutableAuthConfig.builder();
}

static AuthConfig fromProperties(Map<String, String> properties) {
return builder()
.credential(properties.get(OAuth2Properties.CREDENTIAL))
.token(properties.get(OAuth2Properties.TOKEN))
.scope(properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE))
.oauth2ServerUri(
properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()))
.optionalOAuthParams(OAuth2Util.buildOptionalParam(properties))
.keepRefreshed(
PropertyUtil.propertyAsBoolean(
properties,
OAuth2Properties.TOKEN_REFRESH_ENABLED,
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT))
.expiresAtMillis(expiresAtMillis(properties))
.build();
}

private static Long expiresAtMillis(Map<String, String> props) {
Long expiresAtMillis = null;

if (props.containsKey(OAuth2Properties.TOKEN)) {
expiresAtMillis = OAuth2Util.expiresAtMillis(props.get(OAuth2Properties.TOKEN));
}

if (expiresAtMillis == null) {
nastra marked this conversation as resolved.
Show resolved Hide resolved
if (props.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
long millis =
PropertyUtil.propertyAsLong(
props,
OAuth2Properties.TOKEN_EXPIRES_IN_MS,
OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
expiresAtMillis = System.currentTimeMillis() + millis;
}
}

return expiresAtMillis;
}
}
22 changes: 20 additions & 2 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,23 @@ public class AuthManagers {
private AuthManagers() {}

public static AuthManager loadAuthManager(String name, Map<String, String> properties) {
String authType =
properties.getOrDefault(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_NONE);
String authType = properties.get(AuthProperties.AUTH_TYPE);
if (authType == null) {
boolean hasCredential = properties.containsKey(OAuth2Properties.CREDENTIAL);
boolean hasToken = properties.containsKey(OAuth2Properties.TOKEN);
if (hasCredential || hasToken) {
LOG.warn(
"Inferring {}={} since property {} was provided. "
+ "Please explicitly set {} to avoid this warning.",
AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_OAUTH2,
hasCredential ? OAuth2Properties.CREDENTIAL : OAuth2Properties.TOKEN,
AuthProperties.AUTH_TYPE);
authType = AuthProperties.AUTH_TYPE_OAUTH2;
} else {
authType = AuthProperties.AUTH_TYPE_NONE;
}
}

String impl;
switch (authType.toLowerCase(Locale.ROOT)) {
Expand All @@ -42,6 +57,9 @@ public static AuthManager loadAuthManager(String name, Map<String, String> prope
case AuthProperties.AUTH_TYPE_BASIC:
impl = AuthProperties.AUTH_MANAGER_IMPL_BASIC;
break;
case AuthProperties.AUTH_TYPE_OAUTH2:
impl = AuthProperties.AUTH_MANAGER_IMPL_OAUTH2;
break;
default:
impl = authType;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also define something like AUTH_IMPL = "rest.auth.type"; that could be used here for any custom auth manager/provider we want to bring in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's precisely the intent here; if you have a custom manager, you would activate it with rest.auth.type=my.custom.AuthManager. Is that OK for you?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that works! I wasn't sure whether you were planning on having authType for as a name of the type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be aliases for built-in managers, e.g. you can activate the built-in oauth2 manager with either:

rest.auth.type = oauth2
rest.auth.type = org.apache.iceberg.rest.auth.OAuth2Manager

But for custom ones, you must provide the FQDN of your implementation.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ private AuthProperties() {}

public static final String AUTH_TYPE_NONE = "none";
public static final String AUTH_TYPE_BASIC = "basic";
public static final String AUTH_TYPE_OAUTH2 = "oauth2";

public static final String AUTH_MANAGER_IMPL_NONE =
"org.apache.iceberg.rest.auth.NoopAuthManager";
public static final String AUTH_MANAGER_IMPL_BASIC =
"org.apache.iceberg.rest.auth.BasicAuthManager";
public static final String AUTH_MANAGER_IMPL_OAUTH2 =
"org.apache.iceberg.rest.auth.OAuth2Manager";

public static final String BASIC_USERNAME = "rest.auth.basic.username";
public static final String BASIC_PASSWORD = "rest.auth.basic.password";
Expand Down
127 changes: 127 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthSessionCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

/** A cache for {@link AuthSession} instances. */
public class AuthSessionCache implements AutoCloseable {

private final Duration sessionTimeout;
private final Executor executor;
private final LongSupplier nanoTimeSupplier;

private volatile Cache<String, AuthSession> sessionCache;

/**
* Creates a new cache with the given session timeout, and with default executor and nano time
* supplier for eviction tasks.
*
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
*/
public AuthSessionCache(Duration sessionTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we should also make this package private. The other issue is that we shouldn't be using the common pool by default. This should probably be provided by the OAuth2Manager or if we expect there to be only one per manager, then just create a named pool for handling refreshes. We definitely don't want to rely on or possibly tie up the common pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks we are already using the common pool. The session cache currently is created as follows:

return Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMillis(expirationIntervalMs))
.removalListener(
(RemovalListener<String, AuthSession>)
(id, auth, cause) -> {
if (auth != null) {
auth.stopRefreshing();
}
})
.build();
}

As you can see, we are not providing an executor explicitly, so evictions are being done on the common pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: turning this constructor package-private: this won't fly, as SigV4 will also need to create caches, so this constructor needs to be public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks I searched for all Caffeine caches in Iceberg's repo and found 25 occurrences. Only one occurrence sets the executor, the one in CachingCatalog:

.executor(Runnable::run) // Makes the callbacks to removal listener synchronous

Consequently, all the others are using the common pool for asynchronous tasks such as eviction.

Copy link
Contributor Author

@adutra adutra Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks do you maintain that you want me to use a different pool here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we previously had refresh in a dedicated threadpool, so I don't think the other caffeine examples are what we should point to.

Also, we don't want to rely on the common pool for something important like token refresh. If something ties up the thread pool, we may miss the window to refresh. Also, we don't want to tie up the thread pool with refresh threads if something goes wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not about token refresh, this is about auth session eviction.

But fair enough. I'll do it.

this(sessionTimeout, null, null);
}

/**
* Creates a new cache with the given session timeout, executor, and nano time supplier. This
* method is useful for testing mostly.
*
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
* @param executor the executor to use for eviction tasks; if null, the cache will use the
* {@linkplain ForkJoinPool#commonPool() common pool}. The executor will not be closed when
* this cache is closed.
* @param nanoTimeSupplier the supplier for nano time; if null, the cache will use {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we would need to expose this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not really, I can turn this constructor into package-private. It's meant only for tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* System#nanoTime()}.
*/
public AuthSessionCache(
Duration sessionTimeout,
@Nullable Executor executor,
@Nullable LongSupplier nanoTimeSupplier) {
this.sessionTimeout = sessionTimeout;
this.executor = executor;
this.nanoTimeSupplier = nanoTimeSupplier;
}

/**
* Returns a cached session for the given key, loading it with the given loader if it is not
* already cached.
*
* @param key the key to use for the session.
* @param loader the loader to use to load the session if it is not already cached.
* @param <T> the type of the session.
* @return the cached session.
*/
@SuppressWarnings("unchecked")
public <T extends AuthSession> T cachedSession(String key, Function<String, T> loader) {
return (T) sessionCache().get(key, loader);
}

@Override
public void close() {
Cache<String, AuthSession> cache = sessionCache;
this.sessionCache = null;
if (cache != null) {
cache.invalidateAll();
cache.cleanUp();
}
}

@VisibleForTesting
Cache<String, AuthSession> sessionCache() {
if (sessionCache == null) {
synchronized (this) {
if (sessionCache == null) {
this.sessionCache = newSessionCache(sessionTimeout, executor, nanoTimeSupplier);
}
}
}
return sessionCache;
nastra marked this conversation as resolved.
Show resolved Hide resolved
}

private static Cache<String, AuthSession> newSessionCache(
Duration sessionTimeout, Executor executor, LongSupplier nanoTimeSupplier) {
Caffeine<String, AuthSession> builder =
Caffeine.newBuilder()
.expireAfterAccess(sessionTimeout)
.removalListener(
(id, auth, cause) -> {
if (auth != null) {
auth.close();
}
});
if (executor != null) {
builder.executor(executor);
}
if (nanoTimeSupplier != null) {
builder.ticker(nanoTimeSupplier::getAsLong);
}
return builder.build();
}
}
Loading