Skip to content

Commit

Permalink
Add support for native CloudEvents functions.
Browse files Browse the repository at this point in the history
This means functions that receive a CloudEvent payload and dispatch it to a user function that expects a CloudEvent object. We do not yet handle converting a CloudEvent payload to the form expected by legacy GCF functions.
  • Loading branch information
eamonnmcmanus committed Sep 3, 2020
1 parent 1ca4d06 commit 1055468
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package com.google.cloud.functions.invoker;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.cloud.functions.ExperimentalCloudEventsFunction;
import com.google.cloud.functions.RawBackgroundFunction;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand Down Expand Up @@ -54,6 +56,32 @@ private BackgroundFunctionExecutor(FunctionExecutor<?> functionExecutor) {
this.functionExecutor = functionExecutor;
}

enum FunctionKind {
BACKGROUND(BackgroundFunction.class),
RAW_BACKGROUND(RawBackgroundFunction.class),
CLOUD_EVENTS(ExperimentalCloudEventsFunction.class);

static final List<FunctionKind> VALUES = Arrays.asList(values());

final Class<?> functionClass;

FunctionKind(Class<?> functionClass) {
this.functionClass = functionClass;
}

static Optional<FunctionKind> forClass(Class<?> functionClass) {
return VALUES.stream().filter(v -> v.functionClass.isAssignableFrom(functionClass)).findFirst();
}
}

public static Optional<BackgroundFunctionExecutor> maybeForClass(Class<?> functionClass) {
Optional<FunctionKind> maybeFunctionKind = FunctionKind.forClass(functionClass);
if (!maybeFunctionKind.isPresent()) {
return Optional.empty();
}
return Optional.of(forClass(functionClass, maybeFunctionKind.get()));
}

/**
* Makes a {@link HttpFunctionExecutor} for the given class.
*
Expand All @@ -62,12 +90,18 @@ private BackgroundFunctionExecutor(FunctionExecutor<?> functionExecutor) {
* or we are unable to construct an instance using its no-arg constructor.
*/
public static BackgroundFunctionExecutor forClass(Class<?> functionClass) {
if (!BackgroundFunction.class.isAssignableFrom(functionClass)
&& !RawBackgroundFunction.class.isAssignableFrom(functionClass)) {
Optional<FunctionKind> maybeFunctionKind = FunctionKind.forClass(functionClass);
if (!maybeFunctionKind.isPresent()) {
List<String> classNames =
FunctionKind.VALUES.stream().map(v -> v.functionClass.getName()).collect(toList());
throw new RuntimeException(
"Class " + functionClass.getName() + " implements neither " + BackgroundFunction.class
.getName() + " nor " + RawBackgroundFunction.class.getName());
"Class " + functionClass.getName() + " must implement one of these interfaces: "
+ String.join(", ", classNames));
}
return forClass(functionClass, maybeFunctionKind.get());
}

private static BackgroundFunctionExecutor forClass(Class<?> functionClass, FunctionKind functionKind) {
Object instance;
try {
instance = functionClass.getConstructor().newInstance();
Expand All @@ -76,23 +110,31 @@ public static BackgroundFunctionExecutor forClass(Class<?> functionClass) {
"Could not construct an instance of " + functionClass.getName() + ": " + e, e);
}
FunctionExecutor<?> executor;
if (instance instanceof RawBackgroundFunction) {
executor = new RawFunctionExecutor((RawBackgroundFunction) instance);
} else {
BackgroundFunction<?> backgroundFunction = (BackgroundFunction<?>) instance;
@SuppressWarnings("unchecked")
Class<? extends BackgroundFunction<?>> c =
(Class<? extends BackgroundFunction<?>>) backgroundFunction.getClass();
Optional<Type> maybeTargetType = backgroundFunctionTypeArgument(c);
if (!maybeTargetType.isPresent()) {
// This is probably because the user implemented just BackgroundFunction rather than
// BackgroundFunction<T>.
throw new RuntimeException(
"Could not determine the payload type for BackgroundFunction of type "
+ instance.getClass().getName()
+ "; must implement BackgroundFunction<T> for some T");
}
executor = new TypedFunctionExecutor<>(maybeTargetType.get(), backgroundFunction);
switch (functionKind) {
case RAW_BACKGROUND:
executor = new RawFunctionExecutor((RawBackgroundFunction) instance);
break;
case BACKGROUND:
BackgroundFunction<?> backgroundFunction = (BackgroundFunction<?>) instance;
@SuppressWarnings("unchecked")
Class<? extends BackgroundFunction<?>> c =
(Class<? extends BackgroundFunction<?>>) backgroundFunction.getClass();
Optional<Type> maybeTargetType = backgroundFunctionTypeArgument(c);
if (!maybeTargetType.isPresent()) {
// This is probably because the user implemented just BackgroundFunction rather than
// BackgroundFunction<T>.
throw new RuntimeException(
"Could not determine the payload type for BackgroundFunction of type "
+ instance.getClass().getName()
+ "; must implement BackgroundFunction<T> for some T");
}
executor = new TypedFunctionExecutor<>(maybeTargetType.get(), backgroundFunction);
break;
case CLOUD_EVENTS:
executor = new CloudEventFunctionExecutor((ExperimentalCloudEventsFunction) instance);
break;
default: // can't happen, we've listed all the FunctionKind values already.
throw new AssertionError(functionKind);
}
return new BackgroundFunctionExecutor(executor);
}
Expand Down Expand Up @@ -177,12 +219,9 @@ final ClassLoader functionClassLoader() {
return functionClass.getClassLoader();
}

abstract void serviceLegacyEvent(HttpServletRequest req)
throws Exception;
abstract void serviceLegacyEvent(Event legacyEvent) throws Exception;

abstract void serviceCloudEvent(CloudEvent cloudEvent) throws Exception;

abstract Class<CloudEventDataT> cloudEventDataType();
}

private static class RawFunctionExecutor extends FunctionExecutor<Map<?, ?>> {
Expand All @@ -194,9 +233,8 @@ private static class RawFunctionExecutor extends FunctionExecutor<Map<?, ?>> {
}

@Override
void serviceLegacyEvent(HttpServletRequest req) throws Exception {
Event event = parseLegacyEvent(req);
function.accept(new Gson().toJson(event.getData()), event.getContext());
void serviceLegacyEvent(Event legacyEvent) throws Exception {
function.accept(new Gson().toJson(legacyEvent.getData()), legacyEvent.getContext());
}

@Override
Expand All @@ -205,15 +243,6 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
String jsonData = cloudEvent.getData() == null ? "{}" : new String(cloudEvent.getData(), UTF_8);
function.accept(jsonData, context);
}

@Override
Class<Map<?, ?>> cloudEventDataType() {
// This messing about with casts and @SuppressWarnings allows us to limit the use of the raw
// Map type to just here.
@SuppressWarnings("unchecked")
Class<Map<?, ?>> c = (Class<Map<?, ?>>) (Class<?>) Map.class;
return c;
}
}

private static class TypedFunctionExecutor<T> extends FunctionExecutor<T> {
Expand All @@ -233,10 +262,9 @@ static <T> TypedFunctionExecutor<T> of(Type type, BackgroundFunction<?> instance
}

@Override
void serviceLegacyEvent(HttpServletRequest req) throws Exception {
Event event = parseLegacyEvent(req);
T payload = new Gson().fromJson(event.getData(), type);
function.accept(payload, event.getContext());
void serviceLegacyEvent(Event legacyEvent) throws Exception {
T payload = new Gson().fromJson(legacyEvent.getData(), type);
function.accept(payload, legacyEvent.getContext());
}

@Override
Expand All @@ -250,27 +278,33 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
throw new IllegalStateException("Event has no \"data\" component");
}
}
}

private static class CloudEventFunctionExecutor extends FunctionExecutor<Void>{
private final ExperimentalCloudEventsFunction function;

CloudEventFunctionExecutor(ExperimentalCloudEventsFunction function) {
super(function.getClass());
this.function = function;
}

@Override
Class<T> cloudEventDataType() {
if (!(type instanceof Class<?>)) {
throw new IllegalStateException(
"CloudEvents SDK currently does not permit deserializing types other than classes:"
+ " cannot deserialize " + type);
}
@SuppressWarnings("unchecked")
Class<T> c = (Class<T>) type;
return c;
void serviceLegacyEvent(Event legacyEvent) throws Exception {
throw new UnsupportedOperationException(
"Conversion from legacy events to CloudEvents not yet implemented");
}

@Override
void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
function.accept(cloudEvent);
}
}

/** Executes the user's background function. This can handle all HTTP methods. */
@Override
public void service(HttpServletRequest req, HttpServletResponse res) throws IOException {
String contentType = req.getContentType();
ClassLoader oldContextLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(functionExecutor.functionClassLoader());
if ((contentType != null && contentType.startsWith("application/cloudevents+json"))
|| req.getHeader("ce-specversion") != null) {
serviceCloudEvent(req);
Expand All @@ -281,8 +315,6 @@ public void service(HttpServletRequest req, HttpServletResponse res) throws IOEx
} catch (Throwable t) {
res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
logger.log(Level.WARNING, "Failed to execute " + functionExecutor.functionName(), t);
} finally {
Thread.currentThread().setContextClassLoader(oldContextLoader);
}
}

Expand All @@ -306,10 +338,32 @@ private <CloudEventT> void serviceCloudEvent(HttpServletRequest req) throws Exce
() -> headers.getOrDefault("ce-specversion", listOfNull).get(0),
unusedSpecVersion -> CloudEventsServletBinaryMessageReader.from(req, body),
UnknownEncodingMessageReader::new);
executor.serviceCloudEvent(reader.toEvent());
// It's important not to set the context ClassLoader earlier, because MessageUtils will use
// ServiceLoader.load(EventFormat.class) to find a handler to deserialize a binary CloudEvent
// and if it finds something from the function ClassLoader then that something will implement
// the EventFormat interface as defined by that ClassLoader rather than ours. Then ServiceLoader.load
// will throw ServiceConfigurationError. At this point we're still running with the default
// context ClassLoader, which is the system ClassLoader that has loaded the code here.
runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent()));
}

private void serviceLegacyEvent(HttpServletRequest req) throws Exception {
functionExecutor.serviceLegacyEvent(req);
Event event = parseLegacyEvent(req);
runWithContextClassLoader(() -> functionExecutor.serviceLegacyEvent(event));
}

private void runWithContextClassLoader(ContextClassLoaderTask task) throws Exception {
ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(functionExecutor.functionClassLoader());
task.run();
} finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}

@FunctionalInterface
private interface ContextClassLoaderTask {
void run() throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.cloudevents.SpecVersion;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.ExperimentalCloudEventsFunction;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.RawBackgroundFunction;
import com.google.cloud.functions.invoker.BackgroundFunctionExecutor;
Expand Down Expand Up @@ -192,11 +193,18 @@ private static ClassLoader makeClassLoader(Optional<String> functionClasspath) {
ClassLoader runtimeLoader = Invoker.class.getClassLoader();
if (functionClasspath.isPresent()) {
ClassLoader parent = new OnlyApiClassLoader(runtimeLoader);
return new URLClassLoader(classpathToUrls(functionClasspath.get()), parent);
return new FunctionClassLoader(classpathToUrls(functionClasspath.get()), parent);
}
return runtimeLoader;
}

// This is a subclass just so we can identify it from its toString().
private static class FunctionClassLoader extends URLClassLoader {
FunctionClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
}

private final Integer port;
private final String functionTarget;
private final String functionSignatureType;
Expand Down Expand Up @@ -286,9 +294,10 @@ private HttpServlet servletForDeducedSignatureType(Class<?> functionClass) {
if (HttpFunction.class.isAssignableFrom(functionClass)) {
return HttpFunctionExecutor.forClass(functionClass);
}
if (BackgroundFunction.class.isAssignableFrom(functionClass)
|| RawBackgroundFunction.class.isAssignableFrom(functionClass)) {
return BackgroundFunctionExecutor.forClass(functionClass);
Optional<BackgroundFunctionExecutor> maybeExecutor =
BackgroundFunctionExecutor.maybeForClass(functionClass);
if (maybeExecutor.isPresent()) {
return maybeExecutor.get();
}
String error = String.format(
"Could not determine function signature type from target %s. Either this should be"
Expand Down Expand Up @@ -405,12 +414,21 @@ private static class OnlyApiClassLoader extends ClassLoader {
protected Class<?> findClass(String name) throws ClassNotFoundException {
String prefix = "com.google.cloud.functions.";
if ((name.startsWith(prefix) && Character.isUpperCase(name.charAt(prefix.length())))
|| name.startsWith("javax.servlet.")) {
|| name.startsWith("javax.servlet.")
|| isCloudEventsApiClass(name)) {
return runtimeClassLoader.loadClass(name);
}
return super.findClass(name); // should throw ClassNotFoundException
}

private static final String CLOUD_EVENTS_API_PREFIX = "io.cloudevents.";
private static final int CLOUD_EVENTS_API_PREFIX_LENGTH = CLOUD_EVENTS_API_PREFIX.length();

private static boolean isCloudEventsApiClass(String name) {
return name.startsWith(CLOUD_EVENTS_API_PREFIX)
&& Character.isUpperCase(name.charAt(CLOUD_EVENTS_API_PREFIX_LENGTH));
}

private static ClassLoader getSystemOrBootstrapClassLoader() {
try {
// We're still building against the Java 8 API, so we have to use reflection for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,41 @@ private void backgroundTest(String target) throws Exception {
ImmutableList.of(gcfTestCase, cloudEventsStructuredTestCase, cloudEventsBinaryTestCase));
}

/** Tests a CloudEvent being handled by a CloudEvent handler (no translation to or from legacy). */
@Test
public void nativeCloudEvent() throws Exception {
File snoopFile = snoopFile();
CloudEvent cloudEvent = sampleCloudEvent(snoopFile);
EventFormat jsonFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
String cloudEventJson = new String(jsonFormat.serialize(cloudEvent), UTF_8);

// A CloudEvent using the "structured content mode", where both the metadata and the payload
// are in the body of the HTTP request.
JsonObject cloudEventJsonObject = new Gson().fromJson(cloudEventJson, JsonObject.class);
TestCase cloudEventsStructuredTestCase = TestCase.builder()
.setSnoopFile(snoopFile)
.setRequestText(cloudEventJson)
.setHttpContentType("application/cloudevents+json; charset=utf-8")
.setExpectedJson(cloudEventJsonObject)
.build();

// A CloudEvent using the "binary content mode", where the metadata is in HTTP headers and the
// payload is the body of the HTTP request.
BinaryWriter binaryWriter = new BinaryWriter();
Map<String, String> headers = binaryWriter.writeBinary(cloudEvent);
TestCase cloudEventsBinaryTestCase = TestCase.builder()
.setSnoopFile(snoopFile)
.setRequestText(new String(binaryWriter.body, UTF_8))
.setHttpContentType(headers.get("Content-Type"))
.setHttpHeaders(ImmutableMap.copyOf(headers))
.setExpectedJson(cloudEventJsonObject)
.build();

backgroundTest(
fullTarget("CloudEventSnoop"),
ImmutableList.of(cloudEventsStructuredTestCase, cloudEventsBinaryTestCase));
}

@Test
public void nested() throws Exception {
String testText = "sic transit gloria mundi";
Expand Down Expand Up @@ -469,7 +504,8 @@ private void backgroundTest(String functionTarget, List<TestCase> testCases) thr
Gson gson = new Gson();
JsonObject snoopedJson = gson.fromJson(snooped, JsonObject.class);
JsonObject expectedJson = testCase.expectedJson().get();
expect.withMessage("Testing %s with %s\nGOT %s\nNOT %s", functionTarget, testCase, snoopedJson, expectedJson)
expect.withMessage(
"Testing %s with %s\nGOT %s\nNOT %s", functionTarget, testCase, snoopedJson, expectedJson)
.that(snoopedJson).isEqualTo(expectedJson);
}
}
Expand Down
Loading

0 comments on commit 1055468

Please sign in to comment.