Skip to content

Commit

Permalink
Auth Manager API part 3: OAuth2 Manager (#11844)
Browse files Browse the repository at this point in the history
* Auth Manager API part 3: OAuth2 Manager

* review

* review: eviction executor

* checkstyle

* checkstyle

* use ThreadPools.newExitingWorkerPool

* fix test
  • Loading branch information
adutra authored Jan 16, 2025
1 parent 167d450 commit a0777bc
Show file tree
Hide file tree
Showing 10 changed files with 1,233 additions and 11 deletions.
45 changes: 41 additions & 4 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.util.PropertyUtil;
import org.immutables.value.Value;

/**
* The purpose of this class is to hold configuration options for {@link
* org.apache.iceberg.rest.auth.OAuth2Util.AuthSession}.
* The purpose of this class is to hold OAuth configuration options for {@link
* OAuth2Util.AuthSession}.
*/
@Value.Style(redactedMask = "****")
@SuppressWarnings("ImmutablesStyle")
@Value.Immutable
@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
public interface AuthConfig {
@Nullable
@Value.Redacted
Expand All @@ -47,7 +48,7 @@ default String scope() {
return OAuth2Properties.CATALOG_SCOPE;
}

@Value.Lazy
@Value.Default
@Nullable
default Long expiresAtMillis() {
return OAuth2Util.expiresAtMillis(token());
Expand All @@ -69,4 +70,40 @@ default String oauth2ServerUri() {
static ImmutableAuthConfig.Builder builder() {
return ImmutableAuthConfig.builder();
}

static AuthConfig fromProperties(Map<String, String> properties) {
return builder()
.credential(properties.get(OAuth2Properties.CREDENTIAL))
.token(properties.get(OAuth2Properties.TOKEN))
.scope(properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE))
.oauth2ServerUri(
properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()))
.optionalOAuthParams(OAuth2Util.buildOptionalParam(properties))
.keepRefreshed(
PropertyUtil.propertyAsBoolean(
properties,
OAuth2Properties.TOKEN_REFRESH_ENABLED,
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT))
.expiresAtMillis(expiresAtMillis(properties))
.build();
}

private static Long expiresAtMillis(Map<String, String> props) {
Long expiresAtMillis = null;

if (props.containsKey(OAuth2Properties.TOKEN)) {
expiresAtMillis = OAuth2Util.expiresAtMillis(props.get(OAuth2Properties.TOKEN));
}

if (expiresAtMillis == null && props.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
long millis =
PropertyUtil.propertyAsLong(
props,
OAuth2Properties.TOKEN_EXPIRES_IN_MS,
OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
expiresAtMillis = System.currentTimeMillis() + millis;
}

return expiresAtMillis;
}
}
22 changes: 20 additions & 2 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,23 @@ public class AuthManagers {
private AuthManagers() {}

public static AuthManager loadAuthManager(String name, Map<String, String> properties) {
String authType =
properties.getOrDefault(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_NONE);
String authType = properties.get(AuthProperties.AUTH_TYPE);
if (authType == null) {
boolean hasCredential = properties.containsKey(OAuth2Properties.CREDENTIAL);
boolean hasToken = properties.containsKey(OAuth2Properties.TOKEN);
if (hasCredential || hasToken) {
LOG.warn(
"Inferring {}={} since property {} was provided. "
+ "Please explicitly set {} to avoid this warning.",
AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_OAUTH2,
hasCredential ? OAuth2Properties.CREDENTIAL : OAuth2Properties.TOKEN,
AuthProperties.AUTH_TYPE);
authType = AuthProperties.AUTH_TYPE_OAUTH2;
} else {
authType = AuthProperties.AUTH_TYPE_NONE;
}
}

String impl;
switch (authType.toLowerCase(Locale.ROOT)) {
Expand All @@ -42,6 +57,9 @@ public static AuthManager loadAuthManager(String name, Map<String, String> prope
case AuthProperties.AUTH_TYPE_BASIC:
impl = AuthProperties.AUTH_MANAGER_IMPL_BASIC;
break;
case AuthProperties.AUTH_TYPE_OAUTH2:
impl = AuthProperties.AUTH_MANAGER_IMPL_OAUTH2;
break;
default:
impl = authType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ private AuthProperties() {}

public static final String AUTH_TYPE_NONE = "none";
public static final String AUTH_TYPE_BASIC = "basic";
public static final String AUTH_TYPE_OAUTH2 = "oauth2";

public static final String AUTH_MANAGER_IMPL_NONE =
"org.apache.iceberg.rest.auth.NoopAuthManager";
public static final String AUTH_MANAGER_IMPL_BASIC =
"org.apache.iceberg.rest.auth.BasicAuthManager";
public static final String AUTH_MANAGER_IMPL_OAUTH2 =
"org.apache.iceberg.rest.auth.OAuth2Manager";

public static final String BASIC_USERNAME = "rest.auth.basic.username";
public static final String BASIC_PASSWORD = "rest.auth.basic.password";
Expand Down
140 changes: 140 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthSessionCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A cache for {@link AuthSession} instances. */
public class AuthSessionCache implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(AuthSessionCache.class);

private final Duration sessionTimeout;
private final Executor executor;
private final Ticker ticker;

private volatile Cache<String, AuthSession> sessionCache;

/**
* Creates a new cache with the given session timeout, and with default executor and default
* ticker for eviction tasks.
*
* @param name a distinctive name for the cache.
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
*/
public AuthSessionCache(String name, Duration sessionTimeout) {
this(
sessionTimeout,
ThreadPools.newExitingWorkerPool(name + "-auth-session-evict", 1),
Ticker.systemTicker());
}

/**
* Creates a new cache with the given session timeout, executor, and ticker. This method is useful
* for testing mostly.
*
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
* @param executor the executor to use for eviction tasks; if null, the cache will create a
* default executor. The executor will be closed when this cache is closed.
* @param ticker the ticker to use for the cache.
*/
AuthSessionCache(Duration sessionTimeout, Executor executor, Ticker ticker) {
this.sessionTimeout = sessionTimeout;
this.executor = executor;
this.ticker = ticker;
}

/**
* Returns a cached session for the given key, loading it with the given loader if it is not
* already cached.
*
* @param key the key to use for the session.
* @param loader the loader to use to load the session if it is not already cached.
* @param <T> the type of the session.
* @return the cached session.
*/
@SuppressWarnings("unchecked")
public <T extends AuthSession> T cachedSession(String key, Function<String, T> loader) {
return (T) sessionCache().get(key, loader);
}

@Override
public void close() {
try {
Cache<String, AuthSession> cache = sessionCache;
this.sessionCache = null;
if (cache != null) {
cache.invalidateAll();
cache.cleanUp();
}
} finally {
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
service.shutdown();
if (!Uninterruptibles.awaitTerminationUninterruptibly(service, 10, TimeUnit.SECONDS)) {
LOG.warn("Timed out waiting for eviction executor to terminate");
}
service.shutdownNow();
}
}
}

@VisibleForTesting
Cache<String, AuthSession> sessionCache() {
if (sessionCache == null) {
synchronized (this) {
if (sessionCache == null) {
this.sessionCache = newSessionCache();
}
}
}

return sessionCache;
}

private Cache<String, AuthSession> newSessionCache() {
Caffeine<String, AuthSession> builder =
Caffeine.newBuilder()
.executor(executor)
.expireAfterAccess(sessionTimeout)
.ticker(ticker)
.removalListener(
(id, auth, cause) -> {
if (auth != null) {
auth.close();
}
});

return builder.build();
}
}
Loading

0 comments on commit a0777bc

Please sign in to comment.