Skip to content

Commit

Permalink
Merge branch 'termfreqfreq' into bitmapfrequency
Browse files Browse the repository at this point in the history
  • Loading branch information
mkavanagh committed Sep 14, 2020
2 parents 04c2716 + 459c140 commit 5a1b172
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.solr.search.facet.StddevAgg;
import org.apache.solr.search.facet.SumAgg;
import org.apache.solr.search.facet.SumsqAgg;
import org.apache.solr.search.facet.TermFrequencyOfFrequenciesAgg;
import org.apache.solr.search.facet.TopDocsAgg;
import org.apache.solr.search.facet.UniqueAgg;
import org.apache.solr.search.facet.UniqueBlockAgg;
Expand Down Expand Up @@ -1071,6 +1072,8 @@ public ValueSource parse(FunctionQParser fp) throws SyntaxError {

addParser("agg_bitmapfreqfreq64", new FrequencyOfFrequenciesAgg64.Parser());

addParser("agg_termfreqfreq", new TermFrequencyOfFrequenciesAgg.Parser());

addParser("childfield", new ChildFieldValueSourceParser());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.solr.search.facet;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;

import org.apache.solr.common.util.SimpleOrderedMap;

public class TermFrequencyCounter {
private final Map<String, Integer> counters;
private final int limit;

public TermFrequencyCounter(int limit) {
this.counters = new HashMap<>();
this.limit = limit;
}

public Map<String, Integer> getCounters() {
return this.counters;
}

public void add(String value) {
counters.merge(value, 1, Integer::sum);
}

public SimpleOrderedMap<Object> serialize() {
SimpleOrderedMap<Object> serialized = new SimpleOrderedMap<>();

counters.entrySet()
.stream()
.sorted((l, r) -> r.getValue() - l.getValue()) // sort by value descending
.limit(limit)
.forEach(entry -> serialized.add(entry.getKey(), entry.getValue()));

return serialized;
}

public TermFrequencyCounter merge(SimpleOrderedMap<Integer> serialized) {
serialized.forEach((value, freq) -> counters.merge(value, freq, Integer::sum));

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.apache.solr.search.facet;

import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.lucene.queries.function.ValueSource;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.search.FunctionQParser;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.search.ValueSourceParser;

public class TermFrequencyOfFrequenciesAgg extends SimpleAggValueSource {
private final int termLimit;

public TermFrequencyOfFrequenciesAgg(ValueSource vs, int termLimit) {
super("termfreqfreq", vs);

this.termLimit = termLimit;
}

@Override
public SlotAcc createSlotAcc(FacetContext fcontext, int numDocs, int numSlots) {
return new TermFrequencySlotAcc(getArg(), fcontext, numSlots, termLimit);
}

@Override
public FacetMerger createFacetMerger(Object prototype) {
return new Merger(termLimit);
}

public static class Parser extends ValueSourceParser {
@Override
public ValueSource parse(FunctionQParser fp) throws SyntaxError {
ValueSource vs = fp.parseValueSource();

int termLimit = Integer.MAX_VALUE;
if (fp.hasMoreArguments()) {
termLimit = fp.parseInt();
}

return new TermFrequencyOfFrequenciesAgg(vs, termLimit);
}
}

private static class Merger extends FacetMerger {
private final TermFrequencyCounter result;

public Merger(int termLimit) {
this.result = new TermFrequencyCounter(termLimit);
}

@Override
public void merge(Object facetResult, Context mcontext) {
if (facetResult instanceof SimpleOrderedMap) {
result.merge((SimpleOrderedMap<Integer>) facetResult);
}
}

@Override
public void finish(Context mcontext) {
// never called
}

@Override
public Object getMergedResult() {
Map<Integer, Integer> map = new LinkedHashMap<>();

result.getCounters()
.forEach((value, freq) -> map.merge(freq, 1, Integer::sum));

return map;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.apache.solr.search.facet;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.IntFunction;

import org.apache.lucene.queries.function.ValueSource;

public class TermFrequencySlotAcc extends FuncSlotAcc {
private TermFrequencyCounter[] result;
private final int termLimit;

public TermFrequencySlotAcc(ValueSource values, FacetContext fcontext, int numSlots, int termLimit) {
super(values, fcontext, numSlots);

this.result = new TermFrequencyCounter[numSlots];
this.termLimit = termLimit;
}

@Override
public void collect(int doc, int slot, IntFunction<SlotContext> slotContext) throws IOException {
if (result[slot] == null) {
result[slot] = new TermFrequencyCounter(termLimit);
}
result[slot].add(values.strVal(doc));
}

@Override
public int compare(int slotA, int slotB) {
throw new UnsupportedOperationException();
}

@Override
public Object getValue(int slotNum) {
if (result[slotNum] != null) {
return result[slotNum].serialize();
} else {
return Collections.emptyList();
}
}

@Override
public void reset() {
Arrays.fill(result, null);
}

@Override
public void resize(Resizer resizer) {
result = resizer.resize(result, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
* the chain and prints them on finish(). At the Debug (FINE) level, a message
* will be logged for each command prior to the next stage in the chain.
* </p>
* <p>
* If the Log level is not &gt;= INFO the processor will not be created or added to the chain.
* </p>
*
* @since solr 1.3
*/
Expand All @@ -62,7 +59,7 @@ public void init( final NamedList args ) {

@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
return log.isInfoEnabled() ? new LogUpdateProcessor(req, rsp, this, next) : null;
return new LogUpdateProcessor(req, rsp, this, next);
}

static class LogUpdateProcessor extends UpdateRequestProcessor {
Expand Down Expand Up @@ -185,6 +182,8 @@ public void finish() throws IOException {

if (log.isInfoEnabled()) {
log.info(getLogStringAndClearRspToLog());
} else {
rsp.getToLog().clear();
}

if (log.isWarnEnabled() && slowUpdateThresholdMillis >= 0) {
Expand Down
44 changes: 43 additions & 1 deletion solr/solr-ref-guide/src/stream-decorator-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,45 @@ while(true) {
daemonStream.close();
----

== delete-stream

The `delete` function wraps another functions and uses the `id` and `\_version_` values found to sends the tuples to a SolrCloud collection as <<uploading-data-with-index-handlers.adoc#delete-operations,Delete By Id>> commands.

This is similar to the `<<#update,update()>>` function described below.

=== delete-stream Parameters

* `destinationCollection`: (Mandatory) The collection where the tuples will deleted.
* `batchSize`: (Mandatory) The indexing batch size.
* `pruneVersionField`: (Optional, defaults to `false`) Wether to prune `\_version_` values from tuples
* `StreamExpression`: (Mandatory)

=== delete-stream Syntax

[source,text]
----
delete(collection1
batchSize=500,
search(collection1,
q=old_data:true,
qt="/export",
fl="id",
sort="a_f asc, a_i asc"))
----

The example above consumes the tuples returned by the `search` function against `collection1` and converts the `id` value of each document found into a delete request against the same `collection1`.

[NOTE]
====
Unlike the `update()` function, `delete()` defaults to `pruneVersionField=false` -- preserving any `\_version_` values found in the inner stream when converting the tuples to "Delete By ID" requests, to ensure that using this stream will not (by default) result in deleting any documents that were updated _after_ the `search(...)` was executed, but _before_ the `delete(...)` processed that tuple (leveraging <<updating-parts-of-documents.adoc#optimistic-concurrency,Optimistic concurrency>> constraints).
Users who wish to ignore concurrent updates, and delete all matched documents should set `pruneVersionField=false` (or ensure that the inner stream tuples do not include any `\_version_` values).
Users who anticipate concurrent updates, and wish to "skip" any failed deletes, should consider configuraing the {solr-javadocs}/solr-core/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.html[`TolerantUpdateProcessorFactory`]
====


== eval

The `eval` function allows for use cases where new streaming expressions are generated on the fly and then evaluated.
Expand Down Expand Up @@ -1273,12 +1312,13 @@ unique(

== update

The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing.
The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing as Documents.

=== update Parameters

* `destinationCollection`: (Mandatory) The collection where the tuples will indexed.
* `batchSize`: (Mandatory) The indexing batch size.
* `pruneVersionField`: (Optional, defaults to `true`) Wether to prune `\_version_` values from tuples
* `StreamExpression`: (Mandatory)

=== update Syntax
Expand All @@ -1296,3 +1336,5 @@ The `update` function wraps another functions and sends the tuples to a SolrClou
----

The example above sends the tuples returned by the `search` function to the `destinationCollection` to be indexed.

Wrapping `search(...)` as showing in this example is the common case usage of this decorator: to read documents from a collection as tuples, process or modify them in some way, and then add them back to a new collection. For this reason, `pruneVersionField=true` is the default behavior -- stripping any `\_version_` values found in the inner stream when converting the tuples to Solr documents to prevent any unexpected errors from <<updating-parts-of-documents.adoc#optimistic-concurrency,Optimistic concurrency>> constraints.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static void register(StreamFactory streamFactory) {
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("delete", DeleteStream.class)
.withFunctionName("topic", TopicStream.class)
.withFunctionName("commit", CommitStream.class)
.withFunctionName("random", RandomStream.class)
Expand Down Expand Up @@ -336,4 +337,4 @@ public static void register(StreamFactory streamFactory) {
.withFunctionName("if", IfThenElseEvaluator.class)
.withFunctionName("convert", ConversionEvaluator.class);
}
}
}
Loading

0 comments on commit 5a1b172

Please sign in to comment.