Skip to content

Commit

Permalink
Serialize GeoPoint and SpatialPoint in ESQL plan
Browse files Browse the repository at this point in the history
  • Loading branch information
craigtaverner committed Dec 18, 2023
1 parent 0d0d20d commit 5057691
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
package org.elasticsearch.xpack.esql.io.stream;

import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.geo.SpatialPoint;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.geometry.utils.GeometryValidator;
import org.elasticsearch.geometry.utils.WellKnownBinary;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
Expand Down Expand Up @@ -172,6 +176,7 @@
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.type.DateEsField;
import org.elasticsearch.xpack.ql.type.EsField;
import org.elasticsearch.xpack.ql.type.InvalidMappedField;
Expand All @@ -180,6 +185,7 @@
import org.elasticsearch.xpack.ql.type.UnsupportedEsField;

import java.io.IOException;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -190,6 +196,8 @@
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.Entry.of;
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader.readerFromPlanReader;
import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter.writerFromPlanWriter;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypes.CARTESIAN_POINT;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypes.GEO_POINT;

/**
* A utility class that consists solely of static methods that describe how to serialize and
Expand Down Expand Up @@ -1572,15 +1580,63 @@ static void writeAlias(PlanStreamOutput out, Alias alias) throws IOException {
// -- Expressions (other)

static Literal readLiteral(PlanStreamInput in) throws IOException {
return new Literal(in.readSource(), in.readGenericValue(), in.dataTypeFromTypeName(in.readString()));
return new Literal(
in.readSource(),
in.readGenericValue(),
in.dataTypeFromTypeName(in.readString()),
PlanNamedTypes::mapToLiteralValue
);
}

static void writeLiteral(PlanStreamOutput out, Literal literal) throws IOException {
out.writeNoSource();
out.writeGenericValue(literal.value());
out.writeGenericValue(mapFromLiteralValue(literal));
out.writeString(literal.dataType().typeName());
}

/**
* Not all literal values are currently supported in StreamInput/StreamOutput as generic values.
* This mapper allows for addition of new and interesting values without (yet) changing transport version.
* This only makes sense during the pre-GA version of ESQL. When we get near GA we want TransportVersion support.
* TODO: Implement TransportVersion checks before GA (eg. by adding to StreamInput/StreamOutput directly)
*/
private static Object mapFromLiteralValue(Literal literal) {
if (literal.dataType() == GEO_POINT || literal.dataType() == CARTESIAN_POINT) {
if (literal.value() instanceof List<?> list) {
return list.stream().map(v -> pointAsWKB((SpatialPoint) v)).toList();
}
return pointAsWKB((SpatialPoint) literal.value());
}
return literal.value();
}

/**
* Not all literal values are currently supported in StreamInput/StreamOutput as generic values.
* This mapper allows for addition of new and interesting values without (yet) changing transport version.
* This only makes sense during the pre-GA version of ESQL. When we get near GA we want TransportVersion support.
* TODO: Implement TransportVersion checks before GA (eg. by adding to StreamInput/StreamOutput directly)
*/
private static Object mapToLiteralValue(DataType dataType, Object value) {
if (value instanceof List<?> list) {
return list.stream().map(v -> mapToLiteralValue(dataType, v)).toList();
}
if (value instanceof byte[] bytes) {
if (dataType == GEO_POINT || dataType == CARTESIAN_POINT) {
return wkbAsPoint(bytes);
}
}
return value;
}

private static byte[] pointAsWKB(SpatialPoint point) {
return WellKnownBinary.toWKB(new Point(point.getX(), point.getY()), ByteOrder.LITTLE_ENDIAN);
}

private static SpatialPoint wkbAsPoint(byte[] bytes) {
Point point = (Point) WellKnownBinary.fromWKB(GeometryValidator.NOOP, false, bytes);
return new SpatialPoint(point.getX(), point.getY());
}

static Order readOrder(PlanStreamInput in) throws IOException {
return new org.elasticsearch.xpack.esql.expression.Order(
in.readSource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.ql.type.DataTypes;

import java.util.Objects;
import java.util.function.BiFunction;

/**
* SQL Literal or constant.
Expand All @@ -32,6 +33,10 @@ public Literal(Source source, Object value, DataType dataType) {
this.value = value;
}

public Literal(Source source, Object value, DataType dataType, BiFunction<DataType, Object, Object> valueMapper) {
this(source, valueMapper.apply(dataType, value), dataType);
}

@Override
protected NodeInfo<? extends Literal> info() {
return NodeInfo.create(this, Literal::new, value, dataType);
Expand Down

0 comments on commit 5057691

Please sign in to comment.