diff --git a/.github/workflows/test-image.yml b/.github/workflows/test-image.yml index 84055a2..b6e2876 100644 --- a/.github/workflows/test-image.yml +++ b/.github/workflows/test-image.yml @@ -34,6 +34,9 @@ jobs: - "8.8" - "8.9" - "8.10" + - "8.11" + - "8.12" + - "8.13" steps: - uses: actions/checkout@v2 diff --git a/8.10/build.gradle b/8.10/build.gradle index 2d185c3..bfd509c 100644 --- a/8.10/build.gradle +++ b/8.10/build.gradle @@ -1,4 +1,4 @@ -buildPlugin(this, '8.10', revisions(2)) +buildPlugin(this, '8.10', revisions(4)) compileJava { sourceCompatibility = JavaVersion.VERSION_17 diff --git a/8.11/build.gradle b/8.11/build.gradle new file mode 100644 index 0000000..ab258b1 --- /dev/null +++ b/8.11/build.gradle @@ -0,0 +1,6 @@ +buildPlugin(this, '8.11', revisions(4)) + +compileJava { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} \ No newline at end of file diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java new file mode 100644 index 0000000..89720ba --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java @@ -0,0 +1,98 @@ +package com.traveltime.plugin.elasticsearch; + + +import com.traveltime.plugin.elasticsearch.query.TraveltimeFetchPhase; +import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryBuilder; +import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryParser; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.requests.proto.Country; +import com.traveltime.sdk.dto.requests.proto.RequestType; +import com.traveltime.sdk.dto.requests.proto.Transportation; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xcontent.NamedXContentRegistry; + +import java.net.URI; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +public class TraveltimePlugin extends Plugin implements SearchPlugin { + public static final Setting APP_ID = Setting.simpleString("traveltime.app.id", Setting.Property.NodeScope); + public static final Setting API_KEY = Setting.simpleString("traveltime.api.key", Setting.Property.NodeScope, Setting.Property.Filtered); + public static final Setting> DEFAULT_MODE = new Setting<>("traveltime.default.mode", s -> "", Util::findModeByName, Setting.Property.NodeScope); + public static final Setting> DEFAULT_COUNTRY = new Setting<>("traveltime.default.country", s -> "", Util::findCountryByName, Setting.Property.NodeScope); + + public static final Setting> DEFAULT_REQUEST_TYPE = new Setting<>("traveltime.default.request_type", s -> RequestType.ONE_TO_MANY.name(), Util::findRequestTypeByName, Setting.Property.NodeScope); + public static final Setting API_URI = new Setting<>("traveltime.api.uri", s -> "https://proto.api.traveltimeapp.com/api/v2/", URI::create, Setting.Property.NodeScope); + + private static final Setting CACHE_CLEANUP_INTERVAL = Setting.intSetting("traveltime.cache.cleanup.interval", 120, 0, Setting.Property.NodeScope); + private static final Setting CACHE_EXPIRY = Setting.intSetting("traveltime.cache.expiry", 60, 0, Setting.Property.NodeScope); + private static final Setting CACHE_SIZE = Setting.intSetting("traveltime.cache.size", 50, 0, Setting.Property.NodeScope); + + private void cleanUpAndReschedule(ThreadPool threadPool, TimeValue cleanupSeconds) { + TraveltimeCache.INSTANCE.cleanUp(); + threadPool.scheduleUnlessShuttingDown(cleanupSeconds, threadPool.generic(), () -> cleanUpAndReschedule(threadPool, cleanupSeconds)); + } + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + TelemetryProvider telemetryProvider, + AllocationService allocationService, + IndicesService indicesService + ) { + TimeValue cleanupSeconds = TimeValue.timeValueSeconds(CACHE_CLEANUP_INTERVAL.get(environment.settings())); + Duration cacheExpiry = Duration.ofSeconds(CACHE_EXPIRY.get(environment.settings())); + Integer cacheSize = CACHE_SIZE.get(environment.settings()); + + TraveltimeCache.INSTANCE.setUp(cacheSize, cacheExpiry); + cleanUpAndReschedule(threadPool, cleanupSeconds); + + return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, indexNameExpressionResolver, repositoriesServiceSupplier, telemetryProvider, allocationService, indicesService); + + } + + @Override + public List> getSettings() { + return List.of(APP_ID, API_KEY, DEFAULT_MODE, DEFAULT_COUNTRY, DEFAULT_REQUEST_TYPE, API_URI, CACHE_SIZE, CACHE_EXPIRY, CACHE_CLEANUP_INTERVAL); + } + + @Override + public List> getQueries() { + return List.of(new QuerySpec<>(TraveltimeQueryParser.NAME, TraveltimeQueryBuilder::new, new TraveltimeQueryParser())); + } + + @Override + public List getFetchSubPhases(FetchPhaseConstructionContext context) { + return List.of(new TraveltimeFetchPhase()); + } +} diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java new file mode 100644 index 0000000..019a235 --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java @@ -0,0 +1,77 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.TraveltimeCache; +import lombok.val; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.fetch.FetchContext; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.FetchSubPhaseProcessor; +import org.elasticsearch.search.fetch.StoredFieldsSpec; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; +import org.elasticsearch.search.fetch.subphase.FieldFetcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class TraveltimeFetchPhase implements FetchSubPhase { + + private static class ParamFinder extends QueryVisitor { + private final List paramList = new ArrayList<>(); + + @Override + public void visitLeaf(Query query) { + if (query instanceof TraveltimeSearchQuery) { + if (!((TraveltimeSearchQuery) query).getOutput().isEmpty()) { + paramList.add(((TraveltimeSearchQuery) query)); + } + } + } + + public TraveltimeSearchQuery getQuery() { + if (paramList.size() == 1) return paramList.get(0); + else return null; + } + } + + @Override + public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) { + Query query = fetchContext.query(); + val finder = new ParamFinder(); + query.visit(finder); + TraveltimeSearchQuery traveltimeQuery = finder.getQuery(); + if (traveltimeQuery == null) return null; + TraveltimeQueryParameters params = traveltimeQuery.getParams(); + final String output = traveltimeQuery.getOutput(); + + FieldFetcher fieldFetcher = FieldFetcher.create(fetchContext.getSearchExecutionContext(), List.of(new FieldAndFormat(params.getField(), null))); + + return new FetchSubPhaseProcessor() { + + @Override + public void setNextReader(LeafReaderContext readerContext) { + fieldFetcher.setNextReader(readerContext); + } + + @Override + public void process(HitContext hitContext) throws IOException { + val docValues = hitContext.reader().getSortedNumericDocValues(params.getField()); + docValues.advance(hitContext.docId()); + Integer tt = TraveltimeCache.INSTANCE.get(params, docValues.nextValue()); + + if (tt >= 0) { + hitContext.hit().setDocumentField(output, new DocumentField(output, List.of(tt))); + } + } + + @Override + public StoredFieldsSpec storedFieldsSpec() { + return new StoredFieldsSpec(false, false, Set.of(params.getField())); + } + }; + } +} diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java new file mode 100644 index 0000000..256ac02 --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java @@ -0,0 +1,192 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.TraveltimePlugin; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.common.Coordinates; +import com.traveltime.sdk.dto.requests.proto.Country; +import com.traveltime.sdk.dto.requests.proto.RequestType; +import com.traveltime.sdk.dto.requests.proto.Transportation; +import lombok.NonNull; +import lombok.Setter; +import org.apache.lucene.search.Query; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.mapper.GeoPointFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.*; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.net.URI; +import java.util.Objects; +import java.util.Optional; + +@Setter +public class TraveltimeQueryBuilder extends AbstractQueryBuilder { + @NonNull + private String field; + @NonNull + private GeoPoint origin; + private int limit; + private Transportation.Modes mode; + private Country country; + private RequestType requestType; + private QueryBuilder prefilter; + @NonNull + private String output = ""; + + public TraveltimeQueryBuilder() { + } + + public TraveltimeQueryBuilder(StreamInput in) throws IOException { + super(in); + field = in.readString(); + origin = in.readGeoPoint(); + limit = in.readInt(); + mode = in.readOptionalEnum(Transportation.Modes.class); + String c = in.readOptionalString(); + if(c != null) country = Util.findCountryByName(c).orElseGet(() -> new Country.Custom(c)); + requestType = in.readOptionalEnum(RequestType.class); + prefilter = in.readOptionalNamedWriteable(QueryBuilder.class); + output = in.readString(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(field); + out.writeGeoPoint(origin); + out.writeInt(limit); + out.writeOptionalEnum(mode); + out.writeOptionalString(country == null ? null : country.getValue()); + out.writeOptionalEnum(requestType); + out.writeOptionalNamedWriteable(prefilter); + out.writeString(output); + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("field", field); + builder.field("origin", origin); + builder.field("limit", limit); + builder.field("mode", mode == null ? null : mode.getValue()); + builder.field("country", country == null ? null : country.getValue()); + builder.field("requestType", requestType == null ? null : requestType.name()); + builder.field("prefilter", prefilter); + builder.field("output", output); + } + + @Override + protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + if (this.prefilter != null) this.prefilter = this.prefilter.rewrite(queryRewriteContext); + return super.doRewrite(queryRewriteContext); + } + + @Override + protected Query doToQuery(SearchExecutionContext context) throws IOException { + MappedFieldType originMapping = context.getFieldType(field); + if (!(originMapping instanceof GeoPointFieldMapper.GeoPointFieldType)) { + throw new QueryShardException(context, "field [" + field + "] is not a geo_point field"); + } + + GeoUtils.normalizePoint(origin); + if (!GeoUtils.isValidLatitude(origin.getLat())) { + throw new QueryShardException(context, "latitude invalid for origin " + origin); + } + if (!GeoUtils.isValidLongitude(origin.getLon())) { + throw new QueryShardException(context, "longitude invalid for origin " + origin); + } + + URI appUri = TraveltimePlugin.API_URI.get(context.getIndexSettings().getSettings()); + String appId = TraveltimePlugin.APP_ID.get(context.getIndexSettings().getSettings()); + String apiKey = TraveltimePlugin.API_KEY.get(context.getIndexSettings().getSettings()); + if (appId.isEmpty()) { + throw new IllegalStateException("Traveltime app id must be set in the config"); + } + if (apiKey.isEmpty()) { + throw new IllegalStateException("Traveltime api key must be set in the config"); + } + + Optional defaultMode = TraveltimePlugin.DEFAULT_MODE.get(context.getIndexSettings().getSettings()); + Optional defaultCountry = TraveltimePlugin.DEFAULT_COUNTRY.get(context.getIndexSettings().getSettings()); + Optional defaultRequestType = TraveltimePlugin.DEFAULT_REQUEST_TYPE.get(context.getIndexSettings().getSettings()); + + Coordinates originCoord = Coordinates.builder().lat(origin.lat()).lng(origin.getLon()).build(); + + TraveltimeQueryParameters params = new TraveltimeQueryParameters(field, originCoord, limit, mode, country, requestType); + if (params.getMode() == null) { + if (defaultMode.isPresent()) { + params = params.withMode(defaultMode.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'mode' field to be present or a default mode to be set in the config"); + } + } + if (params.getCountry() == null) { + if (defaultCountry.isPresent()) { + params = params.withCountry(defaultCountry.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'country' field to be present or a default country to be set in the config"); + } + } + if(params.getRequestType() == null) { + if(defaultRequestType.isPresent()) { + params = params.withRequestType(defaultRequestType.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'requestType' field to be present or a default request type to be set in the config"); + } + + } + if (params.getLimit() <= 0) { + throw new IllegalStateException("Traveltime limit must be greater than zero"); + } + + Query prefilterQuery = prefilter != null ? prefilter.toQuery(context) : null; + + return new TraveltimeSearchQuery(params, prefilterQuery, output, appUri, appId, apiKey); + } + + @Override + protected boolean doEquals(TraveltimeQueryBuilder other) { + if (!Objects.equals(this.field, other.field)) return false; + if (!Objects.equals(this.origin, other.origin)) return false; + if (!Objects.equals(this.mode, other.mode)) return false; + if (!Objects.equals(this.country, other.country)) return false; + if (!Objects.equals(this.prefilter, other.prefilter)) return false; + if (!Objects.equals(this.output, other.output)) return false; + return this.limit == other.limit; + } + + @Override + protected int doHashCode() { + final int PRIME = 59; + int result = 1; + result = result * PRIME + this.field.hashCode(); + result = result * PRIME + this.origin.hashCode(); + result = result * PRIME + Objects.hashCode(this.mode); + result = result * PRIME + Objects.hashCode(this.country); + result = result * PRIME + Objects.hashCode(this.prefilter); + result = result * PRIME + Objects.hashCode(this.output); + result = result * PRIME + this.limit; + return result; + } + + @Override + public String getWriteableName() { + return TraveltimeQueryParser.NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.MINIMUM_COMPATIBLE; + } + + public static QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException { + return AbstractQueryBuilder.parseInnerQueryBuilder(parser); + } + + +} diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java new file mode 100644 index 0000000..1705d39 --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java @@ -0,0 +1,64 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.util.Util; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryParser; +import org.elasticsearch.xcontent.ContextParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +public class TraveltimeQueryParser implements QueryParser { + public static String NAME = "traveltime"; + private final ParseField field = new ParseField("field"); + private final ParseField origin = new ParseField("origin"); + private final ParseField limit = new ParseField("limit"); + private final ParseField mode = new ParseField("mode"); + private final ParseField country = new ParseField("country"); + private final ParseField requestType = new ParseField("requestType"); + private final ParseField prefilter = new ParseField("prefilter"); + private final ParseField output = new ParseField("output"); + + private final ContextParser prefilterParser = (p, c) -> TraveltimeQueryBuilder.parseInnerQueryBuilder(p); + + private final ObjectParser queryParser = new ObjectParser<>(NAME, TraveltimeQueryBuilder::new); + + { + queryParser.declareString(TraveltimeQueryBuilder::setField, field); + queryParser.declareField(TraveltimeQueryBuilder::setOrigin, (parser, c) -> GeoUtils.parseGeoPoint(parser), origin, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); + queryParser.declareInt(TraveltimeQueryBuilder::setLimit, limit); + queryParser.declareString((qb, s) -> qb.setMode(findByNameOrError("transportation mode", s, Util::findModeByName)), mode); + queryParser.declareString((qb, s) -> qb.setCountry(findByNameOrError("country", s, Util::findCountryByName)), country); + queryParser.declareString((qb, s) -> qb.setRequestType(findByNameOrError("country", s, Util::findRequestTypeByName)), requestType); + queryParser.declareObject(TraveltimeQueryBuilder::setPrefilter, prefilterParser, prefilter); + queryParser.declareString(TraveltimeQueryBuilder::setOutput, output); + + queryParser.declareRequiredFieldSet(field.toString()); + queryParser.declareRequiredFieldSet(origin.toString()); + queryParser.declareRequiredFieldSet(limit.toString()); + } + + private static T findByNameOrError(String what, String name, Function> finder) { + Optional result = finder.apply(name); + if (result.isEmpty()) { + throw new IllegalArgumentException(String.format("Couldn't find a %s with the name %s", what, name)); + } else { + return result.get(); + } + } + + @Override + public TraveltimeQueryBuilder fromXContent(XContentParser parser) throws IOException { + try { + return queryParser.parse(parser, null); + } catch (IllegalArgumentException iae) { + throw new ParsingException(parser.getTokenLocation(), iae.getMessage(), iae); + } + } +} diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java new file mode 100644 index 0000000..530f5af --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java @@ -0,0 +1,99 @@ +package com.traveltime.plugin.elasticsearch.query; + +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import lombok.RequiredArgsConstructor; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Scorer; + +import java.io.IOException; + +public class TraveltimeScorer extends Scorer { + protected final TraveltimeWeight weight; + private final Long2IntMap pointToTime; + private final TraveltimeFilteredDocs docs; + private final float boost; + + @RequiredArgsConstructor + private class TraveltimeFilteredDocs extends DocIdSetIterator { + private final TraveltimeWeight.FilteredIterator backing; + + private long currentValue = 0; + private boolean currentValueDirty = true; + private void invalidateCurrentValue() { + currentValueDirty = true; + } + private void advanceValue() throws IOException { + if(currentValueDirty) { + currentValue = backing.nextValue(); + currentValueDirty = false; + } + } + + public long nextValue() throws IOException { + advanceValue(); + return currentValue; + } + + @Override + public int docID() { + return backing.docID(); + } + + @Override + public int nextDoc() throws IOException { + int id = backing.nextDoc(); + invalidateCurrentValue(); + while (id != DocIdSetIterator.NO_MORE_DOCS && !pointToTime.containsKey(nextValue())) { + id = backing.nextDoc(); + invalidateCurrentValue(); + } + return id; + } + + @Override + public int advance(int target) throws IOException { + int id = backing.advance(target); + invalidateCurrentValue(); + if (id != DocIdSetIterator.NO_MORE_DOCS && !pointToTime.containsKey(nextValue())) { + id = nextDoc(); + } + return id; + } + + @Override + public long cost() { + return backing.cost() * 1000; + } + } + + public TraveltimeScorer(TraveltimeWeight w, Long2IntMap coordToTime, TraveltimeWeight.FilteredIterator docs, float boost) { + super(w); + this.weight = w; + this.pointToTime = coordToTime; + this.docs = new TraveltimeFilteredDocs(docs); + this.boost = boost; + } + + @Override + public DocIdSetIterator iterator() { + return docs; + } + + @Override + public float getMaxScore(int upTo) { + return 1; + } + + @Override + public float score() throws IOException { + int limit = weight.getTtQuery().getParams().getLimit(); + int tt = pointToTime.getOrDefault(docs.nextValue(), limit + 1); + return (boost * (limit - tt + 1)) / (limit + 1); + + } + + @Override + public int docID() { + return docs.docID(); + } +} diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java new file mode 100644 index 0000000..0ee3888 --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java @@ -0,0 +1,50 @@ +package com.traveltime.plugin.elasticsearch.query; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.apache.lucene.search.*; + +import java.io.IOException; +import java.net.URI; + +@AllArgsConstructor +@EqualsAndHashCode(callSuper = false) +@Getter +public class TraveltimeSearchQuery extends Query { + private final TraveltimeQueryParameters params; + private final Query prefilter; + private final String output; + private final URI appUri; + private final String appId; + private final String apiKey; + + @Override + public void visit(QueryVisitor visitor) { + if (prefilter != null) { + prefilter.visit(visitor); + } + visitor.visitLeaf(this); + } + + @Override + public String toString(String field) { + return String.format("TraveltimeSearchQuery(params = %s, prefilter = %s)", params, prefilter); + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + Weight prefilterWeight = prefilter != null ? prefilter.createWeight(searcher, scoreMode, boost) : null; + return new TraveltimeWeight(this, prefilterWeight, !output.isEmpty(), boost); + } + + @Override + public Query rewrite(IndexSearcher reader) throws IOException { + Query newPrefilter = prefilter != null ? prefilter.rewrite(reader) : null; + if (newPrefilter == prefilter) { + return super.rewrite(reader); + } else { + return new TraveltimeSearchQuery(params, newPrefilter, output, appUri, appId, apiKey); + } + } +} diff --git a/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java new file mode 100644 index 0000000..a990cba --- /dev/null +++ b/8.11/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java @@ -0,0 +1,145 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.FetcherSingleton; +import com.traveltime.plugin.elasticsearch.ProtoFetcher; +import com.traveltime.plugin.elasticsearch.TraveltimeCache; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.common.Coordinates; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.val; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.*; +import org.elasticsearch.SpecialPermission; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@EqualsAndHashCode(callSuper = false) +public class TraveltimeWeight extends Weight { + @Getter + private final TraveltimeSearchQuery ttQuery; + + private final Weight prefilter; + + private final boolean hasOutput; + + private final float boost; + + private final Logger log = LogManager.getLogger(); + + @EqualsAndHashCode.Exclude + private final ProtoFetcher protoFetcher; + + public TraveltimeWeight(TraveltimeSearchQuery q, Weight prefilter, boolean hasOutput, float boost) { + super(q); + ttQuery = q; + this.prefilter = prefilter; + this.hasOutput = hasOutput; + this.boost = boost; + protoFetcher = FetcherSingleton.INSTANCE.getFetcher(q.getAppUri(), q.getAppId(), q.getApiKey(), SpecialPermission::new); + } + + @Override + public Explanation explain(LeafReaderContext context, int doc) { + return Explanation.noMatch("Cannot provide explanation for traveltime matches"); + } + + @RequiredArgsConstructor + public static class FilteredIterator { + private final SortedNumericDocValues values; + private final DocIdSetIterator filtered; + + public long nextValue() throws IOException { + return this.values.nextValue(); + } + + public int docID() { + return this.filtered.docID(); + } + + public int nextDoc() throws IOException { + return this.filtered.nextDoc(); + } + + public int advance(int target) throws IOException { + return this.filtered.advance(target); + } + + public long cost() { + return this.filtered.cost(); + } + } + + private FilteredIterator filteredValues(LeafReaderContext context) throws IOException { + val reader = context.reader(); + val backing = reader.getSortedNumericDocValues(ttQuery.getParams().getField()); + + DocIdSetIterator finalIterator; + + if (prefilter != null) { + val preScorer = prefilter.scorer(context); + if(preScorer == null) return null; + val prefilterIterator = preScorer.iterator(); + finalIterator = ConjunctionUtils.intersectIterators(List.of(prefilterIterator, backing)); + } else { + finalIterator = backing; + } + + return new FilteredIterator(backing, finalIterator); + } + + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + val backing = filteredValues(context); + if (backing == null) return null; + + val valueArray = new LongArrayList(); + val decodedArray = new ArrayList(); + val valueSet = new LongOpenHashSet(); + + while (backing.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + long encodedCoords = backing.nextValue(); + if(valueSet.add(encodedCoords)) { + valueArray.add(encodedCoords); + decodedArray.add(Util.decode(encodedCoords)); + } + } + + val pointToTime = new Long2IntOpenHashMap(valueArray.size()); + + val results = protoFetcher.getTimes( + ttQuery.getParams().getOrigin(), + decodedArray, + ttQuery.getParams().getLimit(), + ttQuery.getParams().getMode(), + ttQuery.getParams().getCountry(), + ttQuery.getParams().getRequestType() + ); + + for (int index = 0; index < results.size(); index++) { + if(results.get(index) >= 0) { + pointToTime.put(valueArray.getLong(index), results.get(index).intValue()); + } + } + + if(hasOutput) { + TraveltimeCache.INSTANCE.add(ttQuery.getParams(), pointToTime); + } + + return new TraveltimeScorer(this, pointToTime, filteredValues(context), boost); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return true; + } +} diff --git a/8.11/src/universal/plugin-descriptor.properties b/8.11/src/universal/plugin-descriptor.properties new file mode 100644 index 0000000..4574b21 --- /dev/null +++ b/8.11/src/universal/plugin-descriptor.properties @@ -0,0 +1,6 @@ +description=Traveltime search plugin +version=PLUGIN_VERSION +name=Traveltime +classname=com.traveltime.plugin.elasticsearch.TraveltimePlugin +java.version=1.11 +elasticsearch.version=ES_VERSION diff --git a/8.12/build.gradle b/8.12/build.gradle new file mode 100644 index 0000000..8ad62b8 --- /dev/null +++ b/8.12/build.gradle @@ -0,0 +1,6 @@ +buildPlugin(this, '8.12', revisions(2)) + +compileJava { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} \ No newline at end of file diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java new file mode 100644 index 0000000..d0c837d --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java @@ -0,0 +1,82 @@ +package com.traveltime.plugin.elasticsearch; + + +import com.traveltime.plugin.elasticsearch.query.TraveltimeFetchPhase; +import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryBuilder; +import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryParser; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.requests.proto.Country; +import com.traveltime.sdk.dto.requests.proto.RequestType; +import com.traveltime.sdk.dto.requests.proto.Transportation; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xcontent.NamedXContentRegistry; + +import java.net.URI; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +public class TraveltimePlugin extends Plugin implements SearchPlugin { + public static final Setting APP_ID = Setting.simpleString("traveltime.app.id", Setting.Property.NodeScope); + public static final Setting API_KEY = Setting.simpleString("traveltime.api.key", Setting.Property.NodeScope, Setting.Property.Filtered); + public static final Setting> DEFAULT_MODE = new Setting<>("traveltime.default.mode", s -> "", Util::findModeByName, Setting.Property.NodeScope); + public static final Setting> DEFAULT_COUNTRY = new Setting<>("traveltime.default.country", s -> "", Util::findCountryByName, Setting.Property.NodeScope); + + public static final Setting> DEFAULT_REQUEST_TYPE = new Setting<>("traveltime.default.request_type", s -> RequestType.ONE_TO_MANY.name(), Util::findRequestTypeByName, Setting.Property.NodeScope); + public static final Setting API_URI = new Setting<>("traveltime.api.uri", s -> "https://proto.api.traveltimeapp.com/api/v2/", URI::create, Setting.Property.NodeScope); + + private static final Setting CACHE_CLEANUP_INTERVAL = Setting.intSetting("traveltime.cache.cleanup.interval", 120, 0, Setting.Property.NodeScope); + private static final Setting CACHE_EXPIRY = Setting.intSetting("traveltime.cache.expiry", 60, 0, Setting.Property.NodeScope); + private static final Setting CACHE_SIZE = Setting.intSetting("traveltime.cache.size", 50, 0, Setting.Property.NodeScope); + + private void cleanUpAndReschedule(ThreadPool threadPool, TimeValue cleanupSeconds) { + TraveltimeCache.INSTANCE.cleanUp(); + threadPool.scheduleUnlessShuttingDown(cleanupSeconds, threadPool.generic(), () -> cleanUpAndReschedule(threadPool, cleanupSeconds)); + } + + @Override + public Collection createComponents(PluginServices pluginServices) { + TimeValue cleanupSeconds = TimeValue.timeValueSeconds(CACHE_CLEANUP_INTERVAL.get(pluginServices.environment().settings())); + Duration cacheExpiry = Duration.ofSeconds(CACHE_EXPIRY.get(pluginServices.environment().settings())); + Integer cacheSize = CACHE_SIZE.get(pluginServices.environment().settings()); + + TraveltimeCache.INSTANCE.setUp(cacheSize, cacheExpiry); + cleanUpAndReschedule(pluginServices.threadPool(), cleanupSeconds); + + return super.createComponents(pluginServices); + } + + @Override + public List> getSettings() { + return List.of(APP_ID, API_KEY, DEFAULT_MODE, DEFAULT_COUNTRY, DEFAULT_REQUEST_TYPE, API_URI, CACHE_SIZE, CACHE_EXPIRY, CACHE_CLEANUP_INTERVAL); + } + + @Override + public List> getQueries() { + return List.of(new QuerySpec<>(TraveltimeQueryParser.NAME, TraveltimeQueryBuilder::new, new TraveltimeQueryParser())); + } + + @Override + public List getFetchSubPhases(FetchPhaseConstructionContext context) { + return List.of(new TraveltimeFetchPhase()); + } +} diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java new file mode 100644 index 0000000..019a235 --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java @@ -0,0 +1,77 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.TraveltimeCache; +import lombok.val; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.fetch.FetchContext; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.FetchSubPhaseProcessor; +import org.elasticsearch.search.fetch.StoredFieldsSpec; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; +import org.elasticsearch.search.fetch.subphase.FieldFetcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class TraveltimeFetchPhase implements FetchSubPhase { + + private static class ParamFinder extends QueryVisitor { + private final List paramList = new ArrayList<>(); + + @Override + public void visitLeaf(Query query) { + if (query instanceof TraveltimeSearchQuery) { + if (!((TraveltimeSearchQuery) query).getOutput().isEmpty()) { + paramList.add(((TraveltimeSearchQuery) query)); + } + } + } + + public TraveltimeSearchQuery getQuery() { + if (paramList.size() == 1) return paramList.get(0); + else return null; + } + } + + @Override + public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) { + Query query = fetchContext.query(); + val finder = new ParamFinder(); + query.visit(finder); + TraveltimeSearchQuery traveltimeQuery = finder.getQuery(); + if (traveltimeQuery == null) return null; + TraveltimeQueryParameters params = traveltimeQuery.getParams(); + final String output = traveltimeQuery.getOutput(); + + FieldFetcher fieldFetcher = FieldFetcher.create(fetchContext.getSearchExecutionContext(), List.of(new FieldAndFormat(params.getField(), null))); + + return new FetchSubPhaseProcessor() { + + @Override + public void setNextReader(LeafReaderContext readerContext) { + fieldFetcher.setNextReader(readerContext); + } + + @Override + public void process(HitContext hitContext) throws IOException { + val docValues = hitContext.reader().getSortedNumericDocValues(params.getField()); + docValues.advance(hitContext.docId()); + Integer tt = TraveltimeCache.INSTANCE.get(params, docValues.nextValue()); + + if (tt >= 0) { + hitContext.hit().setDocumentField(output, new DocumentField(output, List.of(tt))); + } + } + + @Override + public StoredFieldsSpec storedFieldsSpec() { + return new StoredFieldsSpec(false, false, Set.of(params.getField())); + } + }; + } +} diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java new file mode 100644 index 0000000..256ac02 --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java @@ -0,0 +1,192 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.TraveltimePlugin; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.common.Coordinates; +import com.traveltime.sdk.dto.requests.proto.Country; +import com.traveltime.sdk.dto.requests.proto.RequestType; +import com.traveltime.sdk.dto.requests.proto.Transportation; +import lombok.NonNull; +import lombok.Setter; +import org.apache.lucene.search.Query; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.mapper.GeoPointFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.*; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.net.URI; +import java.util.Objects; +import java.util.Optional; + +@Setter +public class TraveltimeQueryBuilder extends AbstractQueryBuilder { + @NonNull + private String field; + @NonNull + private GeoPoint origin; + private int limit; + private Transportation.Modes mode; + private Country country; + private RequestType requestType; + private QueryBuilder prefilter; + @NonNull + private String output = ""; + + public TraveltimeQueryBuilder() { + } + + public TraveltimeQueryBuilder(StreamInput in) throws IOException { + super(in); + field = in.readString(); + origin = in.readGeoPoint(); + limit = in.readInt(); + mode = in.readOptionalEnum(Transportation.Modes.class); + String c = in.readOptionalString(); + if(c != null) country = Util.findCountryByName(c).orElseGet(() -> new Country.Custom(c)); + requestType = in.readOptionalEnum(RequestType.class); + prefilter = in.readOptionalNamedWriteable(QueryBuilder.class); + output = in.readString(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(field); + out.writeGeoPoint(origin); + out.writeInt(limit); + out.writeOptionalEnum(mode); + out.writeOptionalString(country == null ? null : country.getValue()); + out.writeOptionalEnum(requestType); + out.writeOptionalNamedWriteable(prefilter); + out.writeString(output); + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("field", field); + builder.field("origin", origin); + builder.field("limit", limit); + builder.field("mode", mode == null ? null : mode.getValue()); + builder.field("country", country == null ? null : country.getValue()); + builder.field("requestType", requestType == null ? null : requestType.name()); + builder.field("prefilter", prefilter); + builder.field("output", output); + } + + @Override + protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + if (this.prefilter != null) this.prefilter = this.prefilter.rewrite(queryRewriteContext); + return super.doRewrite(queryRewriteContext); + } + + @Override + protected Query doToQuery(SearchExecutionContext context) throws IOException { + MappedFieldType originMapping = context.getFieldType(field); + if (!(originMapping instanceof GeoPointFieldMapper.GeoPointFieldType)) { + throw new QueryShardException(context, "field [" + field + "] is not a geo_point field"); + } + + GeoUtils.normalizePoint(origin); + if (!GeoUtils.isValidLatitude(origin.getLat())) { + throw new QueryShardException(context, "latitude invalid for origin " + origin); + } + if (!GeoUtils.isValidLongitude(origin.getLon())) { + throw new QueryShardException(context, "longitude invalid for origin " + origin); + } + + URI appUri = TraveltimePlugin.API_URI.get(context.getIndexSettings().getSettings()); + String appId = TraveltimePlugin.APP_ID.get(context.getIndexSettings().getSettings()); + String apiKey = TraveltimePlugin.API_KEY.get(context.getIndexSettings().getSettings()); + if (appId.isEmpty()) { + throw new IllegalStateException("Traveltime app id must be set in the config"); + } + if (apiKey.isEmpty()) { + throw new IllegalStateException("Traveltime api key must be set in the config"); + } + + Optional defaultMode = TraveltimePlugin.DEFAULT_MODE.get(context.getIndexSettings().getSettings()); + Optional defaultCountry = TraveltimePlugin.DEFAULT_COUNTRY.get(context.getIndexSettings().getSettings()); + Optional defaultRequestType = TraveltimePlugin.DEFAULT_REQUEST_TYPE.get(context.getIndexSettings().getSettings()); + + Coordinates originCoord = Coordinates.builder().lat(origin.lat()).lng(origin.getLon()).build(); + + TraveltimeQueryParameters params = new TraveltimeQueryParameters(field, originCoord, limit, mode, country, requestType); + if (params.getMode() == null) { + if (defaultMode.isPresent()) { + params = params.withMode(defaultMode.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'mode' field to be present or a default mode to be set in the config"); + } + } + if (params.getCountry() == null) { + if (defaultCountry.isPresent()) { + params = params.withCountry(defaultCountry.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'country' field to be present or a default country to be set in the config"); + } + } + if(params.getRequestType() == null) { + if(defaultRequestType.isPresent()) { + params = params.withRequestType(defaultRequestType.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'requestType' field to be present or a default request type to be set in the config"); + } + + } + if (params.getLimit() <= 0) { + throw new IllegalStateException("Traveltime limit must be greater than zero"); + } + + Query prefilterQuery = prefilter != null ? prefilter.toQuery(context) : null; + + return new TraveltimeSearchQuery(params, prefilterQuery, output, appUri, appId, apiKey); + } + + @Override + protected boolean doEquals(TraveltimeQueryBuilder other) { + if (!Objects.equals(this.field, other.field)) return false; + if (!Objects.equals(this.origin, other.origin)) return false; + if (!Objects.equals(this.mode, other.mode)) return false; + if (!Objects.equals(this.country, other.country)) return false; + if (!Objects.equals(this.prefilter, other.prefilter)) return false; + if (!Objects.equals(this.output, other.output)) return false; + return this.limit == other.limit; + } + + @Override + protected int doHashCode() { + final int PRIME = 59; + int result = 1; + result = result * PRIME + this.field.hashCode(); + result = result * PRIME + this.origin.hashCode(); + result = result * PRIME + Objects.hashCode(this.mode); + result = result * PRIME + Objects.hashCode(this.country); + result = result * PRIME + Objects.hashCode(this.prefilter); + result = result * PRIME + Objects.hashCode(this.output); + result = result * PRIME + this.limit; + return result; + } + + @Override + public String getWriteableName() { + return TraveltimeQueryParser.NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.MINIMUM_COMPATIBLE; + } + + public static QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException { + return AbstractQueryBuilder.parseInnerQueryBuilder(parser); + } + + +} diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java new file mode 100644 index 0000000..1705d39 --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java @@ -0,0 +1,64 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.util.Util; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryParser; +import org.elasticsearch.xcontent.ContextParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +public class TraveltimeQueryParser implements QueryParser { + public static String NAME = "traveltime"; + private final ParseField field = new ParseField("field"); + private final ParseField origin = new ParseField("origin"); + private final ParseField limit = new ParseField("limit"); + private final ParseField mode = new ParseField("mode"); + private final ParseField country = new ParseField("country"); + private final ParseField requestType = new ParseField("requestType"); + private final ParseField prefilter = new ParseField("prefilter"); + private final ParseField output = new ParseField("output"); + + private final ContextParser prefilterParser = (p, c) -> TraveltimeQueryBuilder.parseInnerQueryBuilder(p); + + private final ObjectParser queryParser = new ObjectParser<>(NAME, TraveltimeQueryBuilder::new); + + { + queryParser.declareString(TraveltimeQueryBuilder::setField, field); + queryParser.declareField(TraveltimeQueryBuilder::setOrigin, (parser, c) -> GeoUtils.parseGeoPoint(parser), origin, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); + queryParser.declareInt(TraveltimeQueryBuilder::setLimit, limit); + queryParser.declareString((qb, s) -> qb.setMode(findByNameOrError("transportation mode", s, Util::findModeByName)), mode); + queryParser.declareString((qb, s) -> qb.setCountry(findByNameOrError("country", s, Util::findCountryByName)), country); + queryParser.declareString((qb, s) -> qb.setRequestType(findByNameOrError("country", s, Util::findRequestTypeByName)), requestType); + queryParser.declareObject(TraveltimeQueryBuilder::setPrefilter, prefilterParser, prefilter); + queryParser.declareString(TraveltimeQueryBuilder::setOutput, output); + + queryParser.declareRequiredFieldSet(field.toString()); + queryParser.declareRequiredFieldSet(origin.toString()); + queryParser.declareRequiredFieldSet(limit.toString()); + } + + private static T findByNameOrError(String what, String name, Function> finder) { + Optional result = finder.apply(name); + if (result.isEmpty()) { + throw new IllegalArgumentException(String.format("Couldn't find a %s with the name %s", what, name)); + } else { + return result.get(); + } + } + + @Override + public TraveltimeQueryBuilder fromXContent(XContentParser parser) throws IOException { + try { + return queryParser.parse(parser, null); + } catch (IllegalArgumentException iae) { + throw new ParsingException(parser.getTokenLocation(), iae.getMessage(), iae); + } + } +} diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java new file mode 100644 index 0000000..530f5af --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java @@ -0,0 +1,99 @@ +package com.traveltime.plugin.elasticsearch.query; + +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import lombok.RequiredArgsConstructor; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Scorer; + +import java.io.IOException; + +public class TraveltimeScorer extends Scorer { + protected final TraveltimeWeight weight; + private final Long2IntMap pointToTime; + private final TraveltimeFilteredDocs docs; + private final float boost; + + @RequiredArgsConstructor + private class TraveltimeFilteredDocs extends DocIdSetIterator { + private final TraveltimeWeight.FilteredIterator backing; + + private long currentValue = 0; + private boolean currentValueDirty = true; + private void invalidateCurrentValue() { + currentValueDirty = true; + } + private void advanceValue() throws IOException { + if(currentValueDirty) { + currentValue = backing.nextValue(); + currentValueDirty = false; + } + } + + public long nextValue() throws IOException { + advanceValue(); + return currentValue; + } + + @Override + public int docID() { + return backing.docID(); + } + + @Override + public int nextDoc() throws IOException { + int id = backing.nextDoc(); + invalidateCurrentValue(); + while (id != DocIdSetIterator.NO_MORE_DOCS && !pointToTime.containsKey(nextValue())) { + id = backing.nextDoc(); + invalidateCurrentValue(); + } + return id; + } + + @Override + public int advance(int target) throws IOException { + int id = backing.advance(target); + invalidateCurrentValue(); + if (id != DocIdSetIterator.NO_MORE_DOCS && !pointToTime.containsKey(nextValue())) { + id = nextDoc(); + } + return id; + } + + @Override + public long cost() { + return backing.cost() * 1000; + } + } + + public TraveltimeScorer(TraveltimeWeight w, Long2IntMap coordToTime, TraveltimeWeight.FilteredIterator docs, float boost) { + super(w); + this.weight = w; + this.pointToTime = coordToTime; + this.docs = new TraveltimeFilteredDocs(docs); + this.boost = boost; + } + + @Override + public DocIdSetIterator iterator() { + return docs; + } + + @Override + public float getMaxScore(int upTo) { + return 1; + } + + @Override + public float score() throws IOException { + int limit = weight.getTtQuery().getParams().getLimit(); + int tt = pointToTime.getOrDefault(docs.nextValue(), limit + 1); + return (boost * (limit - tt + 1)) / (limit + 1); + + } + + @Override + public int docID() { + return docs.docID(); + } +} diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java new file mode 100644 index 0000000..0ee3888 --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java @@ -0,0 +1,50 @@ +package com.traveltime.plugin.elasticsearch.query; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.apache.lucene.search.*; + +import java.io.IOException; +import java.net.URI; + +@AllArgsConstructor +@EqualsAndHashCode(callSuper = false) +@Getter +public class TraveltimeSearchQuery extends Query { + private final TraveltimeQueryParameters params; + private final Query prefilter; + private final String output; + private final URI appUri; + private final String appId; + private final String apiKey; + + @Override + public void visit(QueryVisitor visitor) { + if (prefilter != null) { + prefilter.visit(visitor); + } + visitor.visitLeaf(this); + } + + @Override + public String toString(String field) { + return String.format("TraveltimeSearchQuery(params = %s, prefilter = %s)", params, prefilter); + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + Weight prefilterWeight = prefilter != null ? prefilter.createWeight(searcher, scoreMode, boost) : null; + return new TraveltimeWeight(this, prefilterWeight, !output.isEmpty(), boost); + } + + @Override + public Query rewrite(IndexSearcher reader) throws IOException { + Query newPrefilter = prefilter != null ? prefilter.rewrite(reader) : null; + if (newPrefilter == prefilter) { + return super.rewrite(reader); + } else { + return new TraveltimeSearchQuery(params, newPrefilter, output, appUri, appId, apiKey); + } + } +} diff --git a/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java new file mode 100644 index 0000000..a990cba --- /dev/null +++ b/8.12/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java @@ -0,0 +1,145 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.FetcherSingleton; +import com.traveltime.plugin.elasticsearch.ProtoFetcher; +import com.traveltime.plugin.elasticsearch.TraveltimeCache; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.common.Coordinates; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.val; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.*; +import org.elasticsearch.SpecialPermission; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@EqualsAndHashCode(callSuper = false) +public class TraveltimeWeight extends Weight { + @Getter + private final TraveltimeSearchQuery ttQuery; + + private final Weight prefilter; + + private final boolean hasOutput; + + private final float boost; + + private final Logger log = LogManager.getLogger(); + + @EqualsAndHashCode.Exclude + private final ProtoFetcher protoFetcher; + + public TraveltimeWeight(TraveltimeSearchQuery q, Weight prefilter, boolean hasOutput, float boost) { + super(q); + ttQuery = q; + this.prefilter = prefilter; + this.hasOutput = hasOutput; + this.boost = boost; + protoFetcher = FetcherSingleton.INSTANCE.getFetcher(q.getAppUri(), q.getAppId(), q.getApiKey(), SpecialPermission::new); + } + + @Override + public Explanation explain(LeafReaderContext context, int doc) { + return Explanation.noMatch("Cannot provide explanation for traveltime matches"); + } + + @RequiredArgsConstructor + public static class FilteredIterator { + private final SortedNumericDocValues values; + private final DocIdSetIterator filtered; + + public long nextValue() throws IOException { + return this.values.nextValue(); + } + + public int docID() { + return this.filtered.docID(); + } + + public int nextDoc() throws IOException { + return this.filtered.nextDoc(); + } + + public int advance(int target) throws IOException { + return this.filtered.advance(target); + } + + public long cost() { + return this.filtered.cost(); + } + } + + private FilteredIterator filteredValues(LeafReaderContext context) throws IOException { + val reader = context.reader(); + val backing = reader.getSortedNumericDocValues(ttQuery.getParams().getField()); + + DocIdSetIterator finalIterator; + + if (prefilter != null) { + val preScorer = prefilter.scorer(context); + if(preScorer == null) return null; + val prefilterIterator = preScorer.iterator(); + finalIterator = ConjunctionUtils.intersectIterators(List.of(prefilterIterator, backing)); + } else { + finalIterator = backing; + } + + return new FilteredIterator(backing, finalIterator); + } + + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + val backing = filteredValues(context); + if (backing == null) return null; + + val valueArray = new LongArrayList(); + val decodedArray = new ArrayList(); + val valueSet = new LongOpenHashSet(); + + while (backing.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + long encodedCoords = backing.nextValue(); + if(valueSet.add(encodedCoords)) { + valueArray.add(encodedCoords); + decodedArray.add(Util.decode(encodedCoords)); + } + } + + val pointToTime = new Long2IntOpenHashMap(valueArray.size()); + + val results = protoFetcher.getTimes( + ttQuery.getParams().getOrigin(), + decodedArray, + ttQuery.getParams().getLimit(), + ttQuery.getParams().getMode(), + ttQuery.getParams().getCountry(), + ttQuery.getParams().getRequestType() + ); + + for (int index = 0; index < results.size(); index++) { + if(results.get(index) >= 0) { + pointToTime.put(valueArray.getLong(index), results.get(index).intValue()); + } + } + + if(hasOutput) { + TraveltimeCache.INSTANCE.add(ttQuery.getParams(), pointToTime); + } + + return new TraveltimeScorer(this, pointToTime, filteredValues(context), boost); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return true; + } +} diff --git a/8.12/src/universal/plugin-descriptor.properties b/8.12/src/universal/plugin-descriptor.properties new file mode 100644 index 0000000..4574b21 --- /dev/null +++ b/8.12/src/universal/plugin-descriptor.properties @@ -0,0 +1,6 @@ +description=Traveltime search plugin +version=PLUGIN_VERSION +name=Traveltime +classname=com.traveltime.plugin.elasticsearch.TraveltimePlugin +java.version=1.11 +elasticsearch.version=ES_VERSION diff --git a/8.13/build.gradle b/8.13/build.gradle new file mode 100644 index 0000000..c4e69c1 --- /dev/null +++ b/8.13/build.gradle @@ -0,0 +1,6 @@ +buildPlugin(this, '8.13', revisions(0)) + +compileJava { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} \ No newline at end of file diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java new file mode 100644 index 0000000..d0c837d --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/TraveltimePlugin.java @@ -0,0 +1,82 @@ +package com.traveltime.plugin.elasticsearch; + + +import com.traveltime.plugin.elasticsearch.query.TraveltimeFetchPhase; +import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryBuilder; +import com.traveltime.plugin.elasticsearch.query.TraveltimeQueryParser; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.requests.proto.Country; +import com.traveltime.sdk.dto.requests.proto.RequestType; +import com.traveltime.sdk.dto.requests.proto.Transportation; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xcontent.NamedXContentRegistry; + +import java.net.URI; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +public class TraveltimePlugin extends Plugin implements SearchPlugin { + public static final Setting APP_ID = Setting.simpleString("traveltime.app.id", Setting.Property.NodeScope); + public static final Setting API_KEY = Setting.simpleString("traveltime.api.key", Setting.Property.NodeScope, Setting.Property.Filtered); + public static final Setting> DEFAULT_MODE = new Setting<>("traveltime.default.mode", s -> "", Util::findModeByName, Setting.Property.NodeScope); + public static final Setting> DEFAULT_COUNTRY = new Setting<>("traveltime.default.country", s -> "", Util::findCountryByName, Setting.Property.NodeScope); + + public static final Setting> DEFAULT_REQUEST_TYPE = new Setting<>("traveltime.default.request_type", s -> RequestType.ONE_TO_MANY.name(), Util::findRequestTypeByName, Setting.Property.NodeScope); + public static final Setting API_URI = new Setting<>("traveltime.api.uri", s -> "https://proto.api.traveltimeapp.com/api/v2/", URI::create, Setting.Property.NodeScope); + + private static final Setting CACHE_CLEANUP_INTERVAL = Setting.intSetting("traveltime.cache.cleanup.interval", 120, 0, Setting.Property.NodeScope); + private static final Setting CACHE_EXPIRY = Setting.intSetting("traveltime.cache.expiry", 60, 0, Setting.Property.NodeScope); + private static final Setting CACHE_SIZE = Setting.intSetting("traveltime.cache.size", 50, 0, Setting.Property.NodeScope); + + private void cleanUpAndReschedule(ThreadPool threadPool, TimeValue cleanupSeconds) { + TraveltimeCache.INSTANCE.cleanUp(); + threadPool.scheduleUnlessShuttingDown(cleanupSeconds, threadPool.generic(), () -> cleanUpAndReschedule(threadPool, cleanupSeconds)); + } + + @Override + public Collection createComponents(PluginServices pluginServices) { + TimeValue cleanupSeconds = TimeValue.timeValueSeconds(CACHE_CLEANUP_INTERVAL.get(pluginServices.environment().settings())); + Duration cacheExpiry = Duration.ofSeconds(CACHE_EXPIRY.get(pluginServices.environment().settings())); + Integer cacheSize = CACHE_SIZE.get(pluginServices.environment().settings()); + + TraveltimeCache.INSTANCE.setUp(cacheSize, cacheExpiry); + cleanUpAndReschedule(pluginServices.threadPool(), cleanupSeconds); + + return super.createComponents(pluginServices); + } + + @Override + public List> getSettings() { + return List.of(APP_ID, API_KEY, DEFAULT_MODE, DEFAULT_COUNTRY, DEFAULT_REQUEST_TYPE, API_URI, CACHE_SIZE, CACHE_EXPIRY, CACHE_CLEANUP_INTERVAL); + } + + @Override + public List> getQueries() { + return List.of(new QuerySpec<>(TraveltimeQueryParser.NAME, TraveltimeQueryBuilder::new, new TraveltimeQueryParser())); + } + + @Override + public List getFetchSubPhases(FetchPhaseConstructionContext context) { + return List.of(new TraveltimeFetchPhase()); + } +} diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java new file mode 100644 index 0000000..019a235 --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeFetchPhase.java @@ -0,0 +1,77 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.TraveltimeCache; +import lombok.val; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.fetch.FetchContext; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.FetchSubPhaseProcessor; +import org.elasticsearch.search.fetch.StoredFieldsSpec; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; +import org.elasticsearch.search.fetch.subphase.FieldFetcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class TraveltimeFetchPhase implements FetchSubPhase { + + private static class ParamFinder extends QueryVisitor { + private final List paramList = new ArrayList<>(); + + @Override + public void visitLeaf(Query query) { + if (query instanceof TraveltimeSearchQuery) { + if (!((TraveltimeSearchQuery) query).getOutput().isEmpty()) { + paramList.add(((TraveltimeSearchQuery) query)); + } + } + } + + public TraveltimeSearchQuery getQuery() { + if (paramList.size() == 1) return paramList.get(0); + else return null; + } + } + + @Override + public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) { + Query query = fetchContext.query(); + val finder = new ParamFinder(); + query.visit(finder); + TraveltimeSearchQuery traveltimeQuery = finder.getQuery(); + if (traveltimeQuery == null) return null; + TraveltimeQueryParameters params = traveltimeQuery.getParams(); + final String output = traveltimeQuery.getOutput(); + + FieldFetcher fieldFetcher = FieldFetcher.create(fetchContext.getSearchExecutionContext(), List.of(new FieldAndFormat(params.getField(), null))); + + return new FetchSubPhaseProcessor() { + + @Override + public void setNextReader(LeafReaderContext readerContext) { + fieldFetcher.setNextReader(readerContext); + } + + @Override + public void process(HitContext hitContext) throws IOException { + val docValues = hitContext.reader().getSortedNumericDocValues(params.getField()); + docValues.advance(hitContext.docId()); + Integer tt = TraveltimeCache.INSTANCE.get(params, docValues.nextValue()); + + if (tt >= 0) { + hitContext.hit().setDocumentField(output, new DocumentField(output, List.of(tt))); + } + } + + @Override + public StoredFieldsSpec storedFieldsSpec() { + return new StoredFieldsSpec(false, false, Set.of(params.getField())); + } + }; + } +} diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java new file mode 100644 index 0000000..256ac02 --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryBuilder.java @@ -0,0 +1,192 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.TraveltimePlugin; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.common.Coordinates; +import com.traveltime.sdk.dto.requests.proto.Country; +import com.traveltime.sdk.dto.requests.proto.RequestType; +import com.traveltime.sdk.dto.requests.proto.Transportation; +import lombok.NonNull; +import lombok.Setter; +import org.apache.lucene.search.Query; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.mapper.GeoPointFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.*; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.net.URI; +import java.util.Objects; +import java.util.Optional; + +@Setter +public class TraveltimeQueryBuilder extends AbstractQueryBuilder { + @NonNull + private String field; + @NonNull + private GeoPoint origin; + private int limit; + private Transportation.Modes mode; + private Country country; + private RequestType requestType; + private QueryBuilder prefilter; + @NonNull + private String output = ""; + + public TraveltimeQueryBuilder() { + } + + public TraveltimeQueryBuilder(StreamInput in) throws IOException { + super(in); + field = in.readString(); + origin = in.readGeoPoint(); + limit = in.readInt(); + mode = in.readOptionalEnum(Transportation.Modes.class); + String c = in.readOptionalString(); + if(c != null) country = Util.findCountryByName(c).orElseGet(() -> new Country.Custom(c)); + requestType = in.readOptionalEnum(RequestType.class); + prefilter = in.readOptionalNamedWriteable(QueryBuilder.class); + output = in.readString(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(field); + out.writeGeoPoint(origin); + out.writeInt(limit); + out.writeOptionalEnum(mode); + out.writeOptionalString(country == null ? null : country.getValue()); + out.writeOptionalEnum(requestType); + out.writeOptionalNamedWriteable(prefilter); + out.writeString(output); + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("field", field); + builder.field("origin", origin); + builder.field("limit", limit); + builder.field("mode", mode == null ? null : mode.getValue()); + builder.field("country", country == null ? null : country.getValue()); + builder.field("requestType", requestType == null ? null : requestType.name()); + builder.field("prefilter", prefilter); + builder.field("output", output); + } + + @Override + protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + if (this.prefilter != null) this.prefilter = this.prefilter.rewrite(queryRewriteContext); + return super.doRewrite(queryRewriteContext); + } + + @Override + protected Query doToQuery(SearchExecutionContext context) throws IOException { + MappedFieldType originMapping = context.getFieldType(field); + if (!(originMapping instanceof GeoPointFieldMapper.GeoPointFieldType)) { + throw new QueryShardException(context, "field [" + field + "] is not a geo_point field"); + } + + GeoUtils.normalizePoint(origin); + if (!GeoUtils.isValidLatitude(origin.getLat())) { + throw new QueryShardException(context, "latitude invalid for origin " + origin); + } + if (!GeoUtils.isValidLongitude(origin.getLon())) { + throw new QueryShardException(context, "longitude invalid for origin " + origin); + } + + URI appUri = TraveltimePlugin.API_URI.get(context.getIndexSettings().getSettings()); + String appId = TraveltimePlugin.APP_ID.get(context.getIndexSettings().getSettings()); + String apiKey = TraveltimePlugin.API_KEY.get(context.getIndexSettings().getSettings()); + if (appId.isEmpty()) { + throw new IllegalStateException("Traveltime app id must be set in the config"); + } + if (apiKey.isEmpty()) { + throw new IllegalStateException("Traveltime api key must be set in the config"); + } + + Optional defaultMode = TraveltimePlugin.DEFAULT_MODE.get(context.getIndexSettings().getSettings()); + Optional defaultCountry = TraveltimePlugin.DEFAULT_COUNTRY.get(context.getIndexSettings().getSettings()); + Optional defaultRequestType = TraveltimePlugin.DEFAULT_REQUEST_TYPE.get(context.getIndexSettings().getSettings()); + + Coordinates originCoord = Coordinates.builder().lat(origin.lat()).lng(origin.getLon()).build(); + + TraveltimeQueryParameters params = new TraveltimeQueryParameters(field, originCoord, limit, mode, country, requestType); + if (params.getMode() == null) { + if (defaultMode.isPresent()) { + params = params.withMode(defaultMode.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'mode' field to be present or a default mode to be set in the config"); + } + } + if (params.getCountry() == null) { + if (defaultCountry.isPresent()) { + params = params.withCountry(defaultCountry.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'country' field to be present or a default country to be set in the config"); + } + } + if(params.getRequestType() == null) { + if(defaultRequestType.isPresent()) { + params = params.withRequestType(defaultRequestType.get()); + } else { + throw new IllegalStateException("Traveltime query requires either 'requestType' field to be present or a default request type to be set in the config"); + } + + } + if (params.getLimit() <= 0) { + throw new IllegalStateException("Traveltime limit must be greater than zero"); + } + + Query prefilterQuery = prefilter != null ? prefilter.toQuery(context) : null; + + return new TraveltimeSearchQuery(params, prefilterQuery, output, appUri, appId, apiKey); + } + + @Override + protected boolean doEquals(TraveltimeQueryBuilder other) { + if (!Objects.equals(this.field, other.field)) return false; + if (!Objects.equals(this.origin, other.origin)) return false; + if (!Objects.equals(this.mode, other.mode)) return false; + if (!Objects.equals(this.country, other.country)) return false; + if (!Objects.equals(this.prefilter, other.prefilter)) return false; + if (!Objects.equals(this.output, other.output)) return false; + return this.limit == other.limit; + } + + @Override + protected int doHashCode() { + final int PRIME = 59; + int result = 1; + result = result * PRIME + this.field.hashCode(); + result = result * PRIME + this.origin.hashCode(); + result = result * PRIME + Objects.hashCode(this.mode); + result = result * PRIME + Objects.hashCode(this.country); + result = result * PRIME + Objects.hashCode(this.prefilter); + result = result * PRIME + Objects.hashCode(this.output); + result = result * PRIME + this.limit; + return result; + } + + @Override + public String getWriteableName() { + return TraveltimeQueryParser.NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.MINIMUM_COMPATIBLE; + } + + public static QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException { + return AbstractQueryBuilder.parseInnerQueryBuilder(parser); + } + + +} diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java new file mode 100644 index 0000000..1705d39 --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeQueryParser.java @@ -0,0 +1,64 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.util.Util; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryParser; +import org.elasticsearch.xcontent.ContextParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +public class TraveltimeQueryParser implements QueryParser { + public static String NAME = "traveltime"; + private final ParseField field = new ParseField("field"); + private final ParseField origin = new ParseField("origin"); + private final ParseField limit = new ParseField("limit"); + private final ParseField mode = new ParseField("mode"); + private final ParseField country = new ParseField("country"); + private final ParseField requestType = new ParseField("requestType"); + private final ParseField prefilter = new ParseField("prefilter"); + private final ParseField output = new ParseField("output"); + + private final ContextParser prefilterParser = (p, c) -> TraveltimeQueryBuilder.parseInnerQueryBuilder(p); + + private final ObjectParser queryParser = new ObjectParser<>(NAME, TraveltimeQueryBuilder::new); + + { + queryParser.declareString(TraveltimeQueryBuilder::setField, field); + queryParser.declareField(TraveltimeQueryBuilder::setOrigin, (parser, c) -> GeoUtils.parseGeoPoint(parser), origin, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); + queryParser.declareInt(TraveltimeQueryBuilder::setLimit, limit); + queryParser.declareString((qb, s) -> qb.setMode(findByNameOrError("transportation mode", s, Util::findModeByName)), mode); + queryParser.declareString((qb, s) -> qb.setCountry(findByNameOrError("country", s, Util::findCountryByName)), country); + queryParser.declareString((qb, s) -> qb.setRequestType(findByNameOrError("country", s, Util::findRequestTypeByName)), requestType); + queryParser.declareObject(TraveltimeQueryBuilder::setPrefilter, prefilterParser, prefilter); + queryParser.declareString(TraveltimeQueryBuilder::setOutput, output); + + queryParser.declareRequiredFieldSet(field.toString()); + queryParser.declareRequiredFieldSet(origin.toString()); + queryParser.declareRequiredFieldSet(limit.toString()); + } + + private static T findByNameOrError(String what, String name, Function> finder) { + Optional result = finder.apply(name); + if (result.isEmpty()) { + throw new IllegalArgumentException(String.format("Couldn't find a %s with the name %s", what, name)); + } else { + return result.get(); + } + } + + @Override + public TraveltimeQueryBuilder fromXContent(XContentParser parser) throws IOException { + try { + return queryParser.parse(parser, null); + } catch (IllegalArgumentException iae) { + throw new ParsingException(parser.getTokenLocation(), iae.getMessage(), iae); + } + } +} diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java new file mode 100644 index 0000000..530f5af --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeScorer.java @@ -0,0 +1,99 @@ +package com.traveltime.plugin.elasticsearch.query; + +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import lombok.RequiredArgsConstructor; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Scorer; + +import java.io.IOException; + +public class TraveltimeScorer extends Scorer { + protected final TraveltimeWeight weight; + private final Long2IntMap pointToTime; + private final TraveltimeFilteredDocs docs; + private final float boost; + + @RequiredArgsConstructor + private class TraveltimeFilteredDocs extends DocIdSetIterator { + private final TraveltimeWeight.FilteredIterator backing; + + private long currentValue = 0; + private boolean currentValueDirty = true; + private void invalidateCurrentValue() { + currentValueDirty = true; + } + private void advanceValue() throws IOException { + if(currentValueDirty) { + currentValue = backing.nextValue(); + currentValueDirty = false; + } + } + + public long nextValue() throws IOException { + advanceValue(); + return currentValue; + } + + @Override + public int docID() { + return backing.docID(); + } + + @Override + public int nextDoc() throws IOException { + int id = backing.nextDoc(); + invalidateCurrentValue(); + while (id != DocIdSetIterator.NO_MORE_DOCS && !pointToTime.containsKey(nextValue())) { + id = backing.nextDoc(); + invalidateCurrentValue(); + } + return id; + } + + @Override + public int advance(int target) throws IOException { + int id = backing.advance(target); + invalidateCurrentValue(); + if (id != DocIdSetIterator.NO_MORE_DOCS && !pointToTime.containsKey(nextValue())) { + id = nextDoc(); + } + return id; + } + + @Override + public long cost() { + return backing.cost() * 1000; + } + } + + public TraveltimeScorer(TraveltimeWeight w, Long2IntMap coordToTime, TraveltimeWeight.FilteredIterator docs, float boost) { + super(w); + this.weight = w; + this.pointToTime = coordToTime; + this.docs = new TraveltimeFilteredDocs(docs); + this.boost = boost; + } + + @Override + public DocIdSetIterator iterator() { + return docs; + } + + @Override + public float getMaxScore(int upTo) { + return 1; + } + + @Override + public float score() throws IOException { + int limit = weight.getTtQuery().getParams().getLimit(); + int tt = pointToTime.getOrDefault(docs.nextValue(), limit + 1); + return (boost * (limit - tt + 1)) / (limit + 1); + + } + + @Override + public int docID() { + return docs.docID(); + } +} diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java new file mode 100644 index 0000000..0ee3888 --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeSearchQuery.java @@ -0,0 +1,50 @@ +package com.traveltime.plugin.elasticsearch.query; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.apache.lucene.search.*; + +import java.io.IOException; +import java.net.URI; + +@AllArgsConstructor +@EqualsAndHashCode(callSuper = false) +@Getter +public class TraveltimeSearchQuery extends Query { + private final TraveltimeQueryParameters params; + private final Query prefilter; + private final String output; + private final URI appUri; + private final String appId; + private final String apiKey; + + @Override + public void visit(QueryVisitor visitor) { + if (prefilter != null) { + prefilter.visit(visitor); + } + visitor.visitLeaf(this); + } + + @Override + public String toString(String field) { + return String.format("TraveltimeSearchQuery(params = %s, prefilter = %s)", params, prefilter); + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + Weight prefilterWeight = prefilter != null ? prefilter.createWeight(searcher, scoreMode, boost) : null; + return new TraveltimeWeight(this, prefilterWeight, !output.isEmpty(), boost); + } + + @Override + public Query rewrite(IndexSearcher reader) throws IOException { + Query newPrefilter = prefilter != null ? prefilter.rewrite(reader) : null; + if (newPrefilter == prefilter) { + return super.rewrite(reader); + } else { + return new TraveltimeSearchQuery(params, newPrefilter, output, appUri, appId, apiKey); + } + } +} diff --git a/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java new file mode 100644 index 0000000..a990cba --- /dev/null +++ b/8.13/src/main/java/com/traveltime/plugin/elasticsearch/query/TraveltimeWeight.java @@ -0,0 +1,145 @@ +package com.traveltime.plugin.elasticsearch.query; + +import com.traveltime.plugin.elasticsearch.FetcherSingleton; +import com.traveltime.plugin.elasticsearch.ProtoFetcher; +import com.traveltime.plugin.elasticsearch.TraveltimeCache; +import com.traveltime.plugin.elasticsearch.util.Util; +import com.traveltime.sdk.dto.common.Coordinates; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.val; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.*; +import org.elasticsearch.SpecialPermission; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@EqualsAndHashCode(callSuper = false) +public class TraveltimeWeight extends Weight { + @Getter + private final TraveltimeSearchQuery ttQuery; + + private final Weight prefilter; + + private final boolean hasOutput; + + private final float boost; + + private final Logger log = LogManager.getLogger(); + + @EqualsAndHashCode.Exclude + private final ProtoFetcher protoFetcher; + + public TraveltimeWeight(TraveltimeSearchQuery q, Weight prefilter, boolean hasOutput, float boost) { + super(q); + ttQuery = q; + this.prefilter = prefilter; + this.hasOutput = hasOutput; + this.boost = boost; + protoFetcher = FetcherSingleton.INSTANCE.getFetcher(q.getAppUri(), q.getAppId(), q.getApiKey(), SpecialPermission::new); + } + + @Override + public Explanation explain(LeafReaderContext context, int doc) { + return Explanation.noMatch("Cannot provide explanation for traveltime matches"); + } + + @RequiredArgsConstructor + public static class FilteredIterator { + private final SortedNumericDocValues values; + private final DocIdSetIterator filtered; + + public long nextValue() throws IOException { + return this.values.nextValue(); + } + + public int docID() { + return this.filtered.docID(); + } + + public int nextDoc() throws IOException { + return this.filtered.nextDoc(); + } + + public int advance(int target) throws IOException { + return this.filtered.advance(target); + } + + public long cost() { + return this.filtered.cost(); + } + } + + private FilteredIterator filteredValues(LeafReaderContext context) throws IOException { + val reader = context.reader(); + val backing = reader.getSortedNumericDocValues(ttQuery.getParams().getField()); + + DocIdSetIterator finalIterator; + + if (prefilter != null) { + val preScorer = prefilter.scorer(context); + if(preScorer == null) return null; + val prefilterIterator = preScorer.iterator(); + finalIterator = ConjunctionUtils.intersectIterators(List.of(prefilterIterator, backing)); + } else { + finalIterator = backing; + } + + return new FilteredIterator(backing, finalIterator); + } + + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + val backing = filteredValues(context); + if (backing == null) return null; + + val valueArray = new LongArrayList(); + val decodedArray = new ArrayList(); + val valueSet = new LongOpenHashSet(); + + while (backing.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + long encodedCoords = backing.nextValue(); + if(valueSet.add(encodedCoords)) { + valueArray.add(encodedCoords); + decodedArray.add(Util.decode(encodedCoords)); + } + } + + val pointToTime = new Long2IntOpenHashMap(valueArray.size()); + + val results = protoFetcher.getTimes( + ttQuery.getParams().getOrigin(), + decodedArray, + ttQuery.getParams().getLimit(), + ttQuery.getParams().getMode(), + ttQuery.getParams().getCountry(), + ttQuery.getParams().getRequestType() + ); + + for (int index = 0; index < results.size(); index++) { + if(results.get(index) >= 0) { + pointToTime.put(valueArray.getLong(index), results.get(index).intValue()); + } + } + + if(hasOutput) { + TraveltimeCache.INSTANCE.add(ttQuery.getParams(), pointToTime); + } + + return new TraveltimeScorer(this, pointToTime, filteredValues(context), boost); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return true; + } +} diff --git a/8.13/src/universal/plugin-descriptor.properties b/8.13/src/universal/plugin-descriptor.properties new file mode 100644 index 0000000..4574b21 --- /dev/null +++ b/8.13/src/universal/plugin-descriptor.properties @@ -0,0 +1,6 @@ +description=Traveltime search plugin +version=PLUGIN_VERSION +name=Traveltime +classname=com.traveltime.plugin.elasticsearch.TraveltimePlugin +java.version=1.11 +elasticsearch.version=ES_VERSION diff --git a/common/src/universal/run-tests.sh b/common/src/universal/run-tests.sh index e51307e..68cab71 100755 --- a/common/src/universal/run-tests.sh +++ b/common/src/universal/run-tests.sh @@ -10,6 +10,7 @@ docker run -d \ -e "traveltime.app.id=id" \ -e "traveltime.api.key=key" \ -e "xpack.security.enabled=false" \ + -e "ES_JAVA_OPTS=-Xms750m -Xmx750m" \ --rm \ --name $IMAGE_NAME \ $IMAGE_NAME diff --git a/settings.gradle b/settings.gradle index 21cb78e..7dd26f1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,3 +20,6 @@ include '8.7' include '8.8' include '8.9' include '8.10' +include '8.11' +include '8.12' +include '8.13'