Skip to content

Commit

Permalink
ES|QL: Add support for cached strings in plan serialization (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila authored Oct 11, 2024
1 parent bebcaf9 commit a9b9172
Show file tree
Hide file tree
Showing 18 changed files with 163 additions and 46 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/112929.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112929
summary: "ES|QL: Add support for cached strings in plan serialization"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ static TransportVersion def(int id) {
public static final TransportVersion TEXT_SIMILARITY_RERANKER_QUERY_REWRITE = def(8_763_00_0);
public static final TransportVersion SIMULATE_INDEX_TEMPLATES_SUBSTITUTIONS = def(8_764_00_0);
public static final TransportVersion RETRIEVERS_TELEMETRY_ADDED = def(8_765_00_0);
public static final TransportVersion ESQL_CACHED_STRING_SERIALIZATION = def(8_766_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private FieldAttribute(StreamInput in) throws IOException {
this(
Source.readFrom((StreamInput & PlanStreamInput) in),
in.readOptionalWriteable(FieldAttribute::readFrom),
in.readString(),
((PlanStreamInput) in).readCachedString(),
DataType.readFrom(in),
EsField.readFrom(in),
in.readOptionalString(),
Expand All @@ -130,7 +130,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
Source.EMPTY.writeTo(out);
out.writeOptionalWriteable(parent);
out.writeString(name());
((PlanStreamOutput) out).writeCachedString(name());
dataType().writeTo(out);
field.writeTo(out);
// We used to write the qualifier here. We can still do if needed in the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -519,12 +521,12 @@ public DataType counter() {
}

public void writeTo(StreamOutput out) throws IOException {
out.writeString(typeName);
((PlanStreamOutput) out).writeCachedString(typeName);
}

public static DataType readFrom(StreamInput in) throws IOException {
// TODO: Use our normal enum serialization pattern
return readFrom(in.readString());
return readFrom(((PlanStreamInput) in).readCachedString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.Map;
Expand All @@ -26,12 +28,12 @@ private DateEsField(String name, DataType dataType, Map<String, EsField> propert
}

protected DateEsField(StreamInput in) throws IOException {
this(in.readString(), DataType.DATETIME, in.readImmutableMap(EsField::readFrom), in.readBoolean());
this(((PlanStreamInput) in).readCachedString(), DataType.DATETIME, in.readImmutableMap(EsField::readFrom), in.readBoolean());
}

@Override
public void writeContent(StreamOutput out) throws IOException {
out.writeString(getName());
((PlanStreamOutput) out).writeCachedString(getName());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
out.writeBoolean(isAggregatable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ public EsField(String name, DataType esDataType, Map<String, EsField> properties
}

public EsField(StreamInput in) throws IOException {
this.name = in.readString();
this.name = ((PlanStreamInput) in).readCachedString();
this.esDataType = readDataType(in);
this.properties = in.readImmutableMap(EsField::readFrom);
this.aggregatable = in.readBoolean();
this.isAlias = in.readBoolean();
}

private DataType readDataType(StreamInput in) throws IOException {
String name = in.readString();
String name = ((PlanStreamInput) in).readCachedString();
if (in.getTransportVersion().before(TransportVersions.ESQL_NESTED_UNSUPPORTED) && name.equalsIgnoreCase("NESTED")) {
/*
* The "nested" data type existed in older versions of ESQL but was
Expand Down Expand Up @@ -98,7 +98,7 @@ public void writeTo(StreamOutput out) throws IOException {
* This needs to be overridden by subclasses for specific serialization
*/
public void writeContent(StreamOutput out) throws IOException {
out.writeString(name);
((PlanStreamOutput) out).writeCachedString(name);
esDataType.writeTo(out);
out.writeMap(properties, (o, x) -> x.writeTo(out));
out.writeBoolean(aggregatable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +54,7 @@ private InvalidMappedField(String name, String errorMessage, Map<String, EsField
}

protected InvalidMappedField(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readImmutableMap(StreamInput::readString, EsField::readFrom));
this(((PlanStreamInput) in).readCachedString(), in.readString(), in.readImmutableMap(StreamInput::readString, EsField::readFrom));
}

public Set<DataType> types() {
Expand All @@ -61,7 +63,7 @@ public Set<DataType> types() {

@Override
public void writeContent(StreamOutput out) throws IOException {
out.writeString(getName());
((PlanStreamOutput) out).writeCachedString(getName());
out.writeString(errorMessage);
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -59,7 +61,7 @@ protected KeywordEsField(

public KeywordEsField(StreamInput in) throws IOException {
this(
in.readString(),
((PlanStreamInput) in).readCachedString(),
KEYWORD,
in.readImmutableMap(EsField::readFrom),
in.readBoolean(),
Expand All @@ -71,7 +73,7 @@ public KeywordEsField(StreamInput in) throws IOException {

@Override
public void writeContent(StreamOutput out) throws IOException {
out.writeString(getName());
((PlanStreamOutput) out).writeCachedString(getName());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
out.writeBoolean(isAggregatable());
out.writeInt(precision);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -36,13 +38,18 @@ public MultiTypeEsField(String name, DataType dataType, boolean aggregatable, Ma
}

protected MultiTypeEsField(StreamInput in) throws IOException {
this(in.readString(), DataType.readFrom(in), in.readBoolean(), in.readImmutableMap(i -> i.readNamedWriteable(Expression.class)));
this(
((PlanStreamInput) in).readCachedString(),
DataType.readFrom(in),
in.readBoolean(),
in.readImmutableMap(i -> i.readNamedWriteable(Expression.class))
);
}

@Override
public void writeContent(StreamOutput out) throws IOException {
out.writeString(getName());
out.writeString(getDataType().typeName());
((PlanStreamOutput) out).writeCachedString(getName());
getDataType().writeTo(out);
out.writeBoolean(isAggregatable());
out.writeMap(getIndexToConversionExpressions(), (o, v) -> out.writeNamedWriteable(v));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.Map;
Expand All @@ -32,12 +34,12 @@ public TextEsField(String name, Map<String, EsField> properties, boolean hasDocV
}

protected TextEsField(StreamInput in) throws IOException {
this(in.readString(), in.readImmutableMap(EsField::readFrom), in.readBoolean(), in.readBoolean());
this(((PlanStreamInput) in).readCachedString(), in.readImmutableMap(EsField::readFrom), in.readBoolean(), in.readBoolean());
}

@Override
public void writeContent(StreamOutput out) throws IOException {
out.writeString(getName());
((PlanStreamOutput) out).writeCachedString(getName());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
out.writeBoolean(isAggregatable());
out.writeBoolean(isAlias());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.Map;
Expand All @@ -34,13 +36,18 @@ public UnsupportedEsField(String name, String originalType, String inherited, Ma
}

public UnsupportedEsField(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readOptionalString(), in.readImmutableMap(EsField::readFrom));
this(
((PlanStreamInput) in).readCachedString(),
((PlanStreamInput) in).readCachedString(),
in.readOptionalString(),
in.readImmutableMap(EsField::readFrom)
);
}

@Override
public void writeContent(StreamOutput out) throws IOException {
out.writeString(getName());
out.writeString(getOriginalType());
((PlanStreamOutput) out).writeCachedString(getName());
((PlanStreamOutput) out).writeCachedString(getOriginalType());
out.writeOptionalString(getInherited());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ public interface PlanStreamInput {
<A extends Attribute> A readAttributeWithCache(CheckedFunction<StreamInput, A, IOException> constructor) throws IOException;

<A extends EsField> A readEsFieldWithCache() throws IOException;

String readCachedString() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface PlanStreamOutput {
* @throws IOException
*/
boolean writeEsFieldCacheHeader(EsField field) throws IOException;

void writeCachedString(String field) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public UnsupportedAttribute(Source source, String name, UnsupportedEsField field
private UnsupportedAttribute(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readString(),
((PlanStreamInput) in).readCachedString(),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ES_FIELD_CACHED_SERIALIZATION)
|| in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_ATTRIBUTE_CACHED_SERIALIZATION_8_15)
? EsField.readFrom(in)
Expand All @@ -90,7 +90,7 @@ private UnsupportedAttribute(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
Source.EMPTY.writeTo(out);
out.writeString(name());
((PlanStreamOutput) out).writeCachedString(name());
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ES_FIELD_CACHED_SERIALIZATION)
|| out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_ATTRIBUTE_CACHED_SERIALIZATION_8_15)) {
field().writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ public NameId apply(long streamNameId) {

private final Map<Integer, Block> cachedBlocks = new HashMap<>();

private Attribute[] attributesCache = new Attribute[64];
private Attribute[] attributesCache = new Attribute[1024];

private EsField[] esFieldsCache = new EsField[64];
private EsField[] esFieldsCache = new EsField[1024];

private String[] stringCache = new String[1024];

// hook for nameId, where can cache and map, for now just return a NameId of the same long value.
private final LongFunction<NameId> nameIdFunction;
Expand Down Expand Up @@ -195,10 +197,11 @@ public <A extends Attribute> A readAttributeWithCache(CheckedFunction<StreamInpu
}

private Attribute attributeFromCache(int id) throws IOException {
if (attributesCache[id] == null) {
Attribute attribute = attributesCache[id];
if (attribute == null) {
throw new IOException("Attribute ID not found in serialization cache [" + id + "]");
}
return attributesCache[id];
return attribute;
}

/**
Expand All @@ -221,7 +224,7 @@ public <A extends EsField> A readEsFieldWithCache() throws IOException {
// it's safe to cast to int, since the max value for this is {@link PlanStreamOutput#MAX_SERIALIZED_ATTRIBUTES}
int cacheId = Math.toIntExact(readZLong());
if (cacheId < 0) {
String className = readString();
String className = readCachedString();
Writeable.Reader<? extends EsField> reader = EsField.getReader(className);
cacheId = -1 - cacheId;
EsField result = reader.read(this);
Expand All @@ -231,17 +234,37 @@ public <A extends EsField> A readEsFieldWithCache() throws IOException {
return (A) esFieldFromCache(cacheId);
}
} else {
String className = readString();
String className = readCachedString();
Writeable.Reader<? extends EsField> reader = EsField.getReader(className);
return (A) reader.read(this);
}
}

/**
* Reads a cached string, serialized with {@link PlanStreamOutput#writeCachedString(String)}.
*/
@Override
public String readCachedString() throws IOException {
if (getTransportVersion().before(TransportVersions.ESQL_CACHED_STRING_SERIALIZATION)) {
return readString();
}
int cacheId = Math.toIntExact(readZLong());
if (cacheId < 0) {
String string = readString();
cacheId = -1 - cacheId;
cacheString(cacheId, string);
return string;
} else {
return stringFromCache(cacheId);
}
}

private EsField esFieldFromCache(int id) throws IOException {
if (esFieldsCache[id] == null) {
EsField field = esFieldsCache[id];
if (field == null) {
throw new IOException("Attribute ID not found in serialization cache [" + id + "]");
}
return esFieldsCache[id];
return field;
}

/**
Expand All @@ -257,4 +280,27 @@ private void cacheEsField(int id, EsField field) {
esFieldsCache[id] = field;
}

private String stringFromCache(int id) throws IOException {
String value = stringCache[id];
if (value == null) {
throw new IOException("String not found in serialization cache [" + id + "]");
}
return value;
}

private void cacheString(int id, String string) {
assert id >= 0;
if (id >= stringCache.length) {
stringCache = ArrayUtil.grow(stringCache, id + 1);
}
stringCache[id] = string;
}

@Override
public void close() throws IOException {
super.close();
this.stringCache = null;
this.attributesCache = null;
this.esFieldsCache = null;
}
}
Loading

0 comments on commit a9b9172

Please sign in to comment.