Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23338 Reduce number of intermediate serializations in Compute #4778

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.ignite.internal.client.proto;

import static org.apache.ignite.internal.client.proto.ComputeJobType.MARSHALLED_CUSTOM;
import static org.apache.ignite.internal.client.proto.ComputeJobType.MARSHALLED_POJO;
import static org.apache.ignite.internal.client.proto.ComputeJobType.MARSHALLED_TUPLE;
import static org.apache.ignite.internal.client.proto.ComputeJobType.NATIVE;
import static org.apache.ignite.internal.client.proto.pojo.PojoConverter.toTuple;
import static org.apache.ignite.internal.compute.ComputeJobType.MARSHALLED_CUSTOM;
import static org.apache.ignite.internal.compute.ComputeJobType.MARSHALLED_POJO;
import static org.apache.ignite.internal.compute.ComputeJobType.MARSHALLED_TUPLE;
import static org.apache.ignite.internal.compute.ComputeJobType.NATIVE;

import java.util.Arrays;
import java.util.Set;
Expand Down Expand Up @@ -74,7 +74,7 @@ private static <T> void pack(@Nullable T obj, @Nullable Marshaller<T, byte[]> ma
}

if (marshaller != null) {
packer.packInt(MARSHALLED_CUSTOM);
packer.packInt(MARSHALLED_CUSTOM.id());
byte[] marshalled = marshaller.marshal(obj);

if (marshalled == null) {
Expand All @@ -87,21 +87,21 @@ private static <T> void pack(@Nullable T obj, @Nullable Marshaller<T, byte[]> ma
}

if (obj instanceof Tuple) {
packer.packInt(MARSHALLED_TUPLE);
packer.packInt(MARSHALLED_TUPLE.id());

packTuple((Tuple) obj, packer);
return;
}

if (isNativeType(obj.getClass())) {
packer.packInt(NATIVE);
packer.packInt(NATIVE.id());

packer.packObjectAsBinaryTuple(obj);
return;
}

try {
packer.packInt(MARSHALLED_POJO);
packer.packInt(MARSHALLED_POJO.id());

packTuple(toTuple(obj), packer);
} catch (PojoConversionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.ignite.internal.client.proto;

import static org.apache.ignite.internal.client.proto.ComputeJobType.MARSHALLED_CUSTOM;
import static org.apache.ignite.internal.client.proto.ComputeJobType.MARSHALLED_POJO;
import static org.apache.ignite.internal.client.proto.ComputeJobType.MARSHALLED_TUPLE;
import static org.apache.ignite.internal.client.proto.ComputeJobType.NATIVE;
import static org.apache.ignite.internal.client.proto.pojo.PojoConverter.fromTuple;
import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;

import java.lang.reflect.InvocationTargetException;
import org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
import org.apache.ignite.internal.client.proto.pojo.PojoConversionException;
import org.apache.ignite.internal.compute.ComputeJobArgumentHolder;
import org.apache.ignite.internal.compute.ComputeJobType;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.marshalling.UnmarshallingException;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -54,8 +52,12 @@ public final class ClientComputeJobUnpacker {

// Underlying byte array expected to be in the following format: | typeId | value |.
int typeId = unpacker.unpackInt();
ComputeJobType type = ComputeJobType.fromId(typeId);
if (type == null) {
throw new UnmarshallingException("Unsupported compute job type id: " + typeId);
}

switch (typeId) {
switch (type) {
case NATIVE:
if (marshaller != null) {
throw new UnmarshallingException(
Expand Down Expand Up @@ -85,7 +87,7 @@ public final class ClientComputeJobUnpacker {
return unpackPojo(unpacker, resultClass);

default:
throw new UnmarshallingException("Unsupported compute job type id: " + typeId);
throw new UnmarshallingException("Unsupported compute job type: " + type);
}
}

Expand Down Expand Up @@ -118,17 +120,21 @@ private static Object unpackPojo(ClientMessageUnpacker unpacker, Class<?> pojoCl
}

int typeId = unpacker.unpackInt();
ComputeJobType type = ComputeJobType.fromId(typeId);
if (type == null) {
throw new UnmarshallingException("Unsupported compute job type id: " + typeId);
}

switch (typeId) {
switch (type) {
case NATIVE:
return unpacker.unpackObjectFromBinaryTuple();
case MARSHALLED_TUPLE: // Fallthrough, the pojo is unmarshalled during execution when the concrete type is known.
case MARSHALLED_TUPLE: // Fallthrough, both types are unmarshalled just before execution.
case MARSHALLED_POJO:
return TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
return new ComputeJobArgumentHolder(type, unpacker.readBinary());
case MARSHALLED_CUSTOM:
return unpacker.readBinary();
default:
throw new UnmarshallingException("Unsupported compute job type id: " + typeId);
throw new UnmarshallingException("Unsupported compute job type: " + type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.ignite.internal.client.proto;

import static org.apache.ignite.internal.client.proto.ClientComputeJobPacker.packJobArgument;
import static org.apache.ignite.internal.client.proto.ClientComputeJobPacker.packJobResult;
import static org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller;
import static org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJobResult;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
Expand All @@ -38,6 +42,8 @@
import java.util.stream.Stream;
import org.apache.ignite.internal.client.proto.pojo.Pojo;
import org.apache.ignite.internal.client.proto.pojo.StaticFieldPojo;
import org.apache.ignite.internal.compute.ComputeJobArgumentHolder;
import org.apache.ignite.internal.compute.ComputeJobType;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.marshalling.MarshallingException;
import org.apache.ignite.marshalling.UnmarshallingException;
Expand Down Expand Up @@ -78,6 +84,13 @@ private static List<Object> pojo() {
return List.of(Pojo.generateTestPojo());
}

private static List<Arguments> notMarshalled() {
return List.of(
arguments(Tuple.create(), ComputeJobType.MARSHALLED_TUPLE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we introduce a couple of new ComputeJobTypes? The idea behind MARSHALLED prefix is that is was really marshalled. But here we do not marshal the argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if MARSHALLED_ prefix is necessary.

  • All types of job args and results can be in marshalled or unmarshalled state
  • A job can take a tuple and return a primitive, etc, so this is not a job type, but a job argument/result type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you are right. My idea was to add more information to the argument/result like "was this thing marshalled or not?". The answer to this question might be handy but most likely it will not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identifiers are passed from the client when it packs the argument (and vice versa, when client handler packs the result).
So from the point of view of the client-to-node communication they are marshalled.
They are not marshalled, however, from the point of view of the node-to-node communication, which this ticket is about.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two pieces of information:

  • Argument type (tuple, primitive, pojo) - relevant on server and on client
  • Marshalled state (true or false) - only relevant on the server side

arguments(Pojo.generateTestPojo(), ComputeJobType.MARSHALLED_POJO)
);
}

private ClientMessagePacker messagePacker;

@BeforeEach
Expand Down Expand Up @@ -110,6 +123,23 @@ void packUnpackNoMarshalling(Object arg) {
}
}

@MethodSource("notMarshalled")
@ParameterizedTest
void notMarshalledArgument(Object arg, ComputeJobType type) {
// When pack job argument without marshaller.
packJobArgument(arg, null, messagePacker);
byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());

// And unpack without marshaller.
try (var messageUnpacker = messageUnpacker(data)) {
var res = unpackJobArgumentWithoutMarshaller(messageUnpacker);

// Then argument is unpacked but not unmarshalled.
ComputeJobArgumentHolder argument = assertInstanceOf(ComputeJobArgumentHolder.class, res);
assertEquals(type, argument.type());
}
}

@MethodSource("pojo")
@ParameterizedTest
void packUnpackPojo(Object arg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
import org.apache.ignite.internal.client.proto.pojo.PojoConversionException;
import org.apache.ignite.internal.compute.loader.JobClassLoader;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
Expand Down Expand Up @@ -367,6 +368,19 @@ private static Throwable mapToComputeException(Throwable origin) {
}

if (marshaller == null) {
if (input instanceof ComputeJobArgumentHolder) {
ComputeJobArgumentHolder argumentHolder = (ComputeJobArgumentHolder) input;
ComputeJobType type = argumentHolder.type();
switch (type) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23320
case MARSHALLED_TUPLE: // Fallthrough, both types are unmarshalled just before execution.
case MARSHALLED_POJO:
input = TupleWithSchemaMarshalling.unmarshal(argumentHolder.data());
break;
default:
throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, "Unexpected job argument type: " + type);
}
}
if (input instanceof Tuple) {
// If input was marshalled as Tuple and argument type is not tuple then it's a pojo.
if (pojoType != null && pojoType != Tuple.class) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.compute;

/**
* Holds unmarshalled binary data for the compute, used for the optimization to reduce number of conversions in case when thin client
* requests job execution on another job. In this case, the argument is packed on the client, unpacked as a binary data on the client
* handler node, passed as an object of this class to the remote node and only there it is unmarshalled.
*/
public class ComputeJobArgumentHolder {
private final ComputeJobType type;

private final byte[] data;

/**
* Constructs a holder.
*
* @param type Job argument type.
* @param data Marshalled data.
*/
public ComputeJobArgumentHolder(ComputeJobType type, byte[] data) {
this.type = type;
this.data = data;
}

/**
* Returns job argument type.
*
* @return Job argument type.
*/
public ComputeJobType type() {
return type;
}

/**
* Returns marshalled data.
*
* @return Marshalled data.
*/
public byte[] data() {
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,40 @@
* limitations under the License.
*/

package org.apache.ignite.internal.client.proto;
package org.apache.ignite.internal.compute;

import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;

/**
* The type of the object that can be passed/returned to/from the compute job. In can be a native type that is represented by
* {@link ColumnType} or a marshalled object/tuple.
*/
class ComputeJobType {
static final int NATIVE = 0;
static final int MARSHALLED_TUPLE = 1;
static final int MARSHALLED_CUSTOM = 2;
static final int MARSHALLED_POJO = 3;
public enum ComputeJobType {
NATIVE(0),
MARSHALLED_TUPLE(1),
MARSHALLED_CUSTOM(2),
MARSHALLED_POJO(3);

private static final ComputeJobType[] VALUES = values();

private final int id;

ComputeJobType(int id) {
this.id = id;
}

public int id() {
return id;
}

/**
* Returns enum value corresponding to the id.
*
* @param id Identifier of the value.
* @return Enum value or {@code null} if identifier is invalid.
*/
public static @Nullable ComputeJobType fromId(int id) {
return id >= 0 && id < VALUES.length ? VALUES[id] : null;
}
}