Skip to content

Elephant Bird Lucene: Querying Indexes

isnotinvain edited this page Jan 4, 2013 · 7 revisions

For the general overview of Elephant-Bird-Lucene, see Elephant Bird Lucene

The first step is to create a LuceneIndexInputFormat. This is where you specify how your lucene indexes are searched and how the results are transformed into map reduce values.

Here's an example that retrieves all the tweets that match the given queries from the indexes created on the Creating Indexes page.

package com.example;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Version;

import com.twitter.elephantbird.mapreduce.input.LuceneIndexCollectAllRecordReader;
import com.twitter.elephantbird.mapreduce.input.LuceneIndexInputFormat;

/**
 * Retrieves all the hits from each index visited
 */
public class TweetIndexInputFormat extends LuceneIndexInputFormat<TweetWritable> {
  private QueryParser parser = new QueryParser(Version.LUCENE_40,
                                               TweetIndexOutputFormat.TWEET_TEXT_FIELD,
                                               new SimpleAnalyzer(Version.LUCENE_40));

  @Override
  public RecordReader<IntWritable, TweetWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {

    // LuceneIndexCollectAllRecordReader does some of the work for you if you want to collect every
    // hit. However, if you want full control over searching, return a new LuceneIndexRecordReader
    // which will provide you with a Query and a Searcher, and you can search however you'd like
    return new LuceneIndexCollectAllRecordReader<TweetWritable>() {
      // specify how to turn a serialized query into a Query object
      @Override
      protected Query deserializeQuery(String serializedString) throws IOException {
        try {
          return parser.parse(serializedString);
        } catch (ParseException e) {
          throw new IOException(e);
        }
      }

      // specify how to turn a Document into a map reduce value
      @Override
      protected TweetWritable docToValue(Document document) {
        TweetWritable tweet = new TweetWritable();
        tweet.setText(document.get(TweetIndexOutputFormat.TWEET_TEXT_FIELD));
        tweet.setUserId(Long.valueOf(document.get(TweetIndexOutputFormat.USER_ID_FIELD)));
        return tweet;
      }
    };
  }
}

Here's an example of using a custom collector to carry out the search:

  @Override
  public RecordReader<IntWritable, TweetWritable>
    createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    return new LuceneIndexRecordReader<TweetWritable>() {

      @Override
      protected Query deserializeQuery(String serializedString) throws IOException {
        try {
          return parser.parse(serializedString);
        } catch (ParseException e) {
          throw new IOException(e);
        }
      }

      @Override
      protected List<TweetWritable> search(IndexSearcher searcher, Query query) throws IOException {
        MyCollector myCollector = new MyCollector();
        searcher.search(query, myCollector);
        return myCollector.getTweets();
      }
    };
  }

  private static class MyCollector extends Collector {

    @Override
    public void setScorer(Scorer scorer) throws IOException {
     ...
    }

    @Override
    public void collect(int doc) throws IOException {
      ...
    }

    @Override
    public void setNextReader(AtomicReaderContext context) throws IOException {
      ...
    }

    @Override
    public boolean acceptsDocsOutOfOrder() {
      ...
    }

    public List<TweetWritable> getTweets() {
      ...
    }
  }

Now that you have a LuceneIndexInputFormat, it can be used in a map reduce job, but will require some configuration:

  • You will have to call LuceneIndexInputFormat.setQueries(List<String> queries, Configuration conf) to set the list of queries that will be run over the indexes. These query strings are the ones that will be passed to deserializeQuery above, so you can use a query parser if you'd like as in the example, or you could serialize + deserialize your queries some other way.

  • You will have to call LuceneIndexInputFormat.setInputPaths(List<Path> paths, Configuration conf) to set which indexes to search. These paths will be searched recursively for directories containing lucene indexes. The default is to look for directories that look like the output of LuceneIndexOutputFormat (eg. directories that begin with 'index-') but you may override getIndexDirPathFilter(Configuration conf) to change that behavior.

  • You may call LuceneIndexInputFormat.setMaxCombineSplitSizeBytes(long size, Configuration conf) to control how many indexes are combined into a single split (and therefore a single mapper)