Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ab9cc94
Batches in verbose selector
elshize Apr 18, 2017
3c97437
Batches in verbose selector & mapping complex
elshize Apr 18, 2017
a2c8cd0
Complex query IDs fix
elshize Apr 18, 2017
6691c3f
Complex query IDs fix
elshize Apr 18, 2017
e182e9a
Complex query IDs fix
elshize Apr 18, 2017
44aab34
Complex query IDs fix
elshize Apr 18, 2017
5d8b81a
copy impacts script
elshize Apr 19, 2017
80cde77
copy impacts script
elshize Apr 19, 2017
16a0de1
copy impacts script
elshize Apr 19, 2017
ea42635
Logging in LabelResults
elshize Apr 19, 2017
17014da
Logging in LabelResults
elshize Apr 19, 2017
68e774b
Typo fix
elshize Apr 19, 2017
1afc1d5
Typo fix
elshize Apr 19, 2017
d1e2328
Option to use posting costs instead of uniform costs
elshize Apr 26, 2017
6ad2c4e
Merge branch 'master' into fix
elshize Apr 26, 2017
46bb5da
Add param to set CR coefficient
elshize Apr 29, 2017
f42fbd8
Complex Precision
elshize Apr 29, 2017
1e72e86
Complex Precision
elshize Apr 29, 2017
91d560c
Complex Precision
elshize Apr 29, 2017
780d5d3
Complex Precision
elshize May 1, 2017
1653030
Remove maxTop
elshize May 7, 2017
17b0d7c
Remove maxTop
elshize May 7, 2017
55e4ef2
Remove maxTop
elshize May 7, 2017
888644f
script: duplicate buckets for impacts
elshize May 8, 2017
5ab0756
script: duplicate buckets for impacts
elshize May 8, 2017
ecc6e03
script: duplicate buckets for impacts
elshize May 8, 2017
d0a0db8
script: duplicate buckets for impacts
elshize May 8, 2017
d90aa10
script: duplicate buckets for impacts
elshize May 8, 2017
5bcfaf6
script: duplicate buckets for impacts
elshize May 10, 2017
159517a
Report the number of selected shards
elshize May 17, 2017
9b69fd7
Report overhead postings total/relative
elshize May 17, 2017
b635cec
Report overhead postings total/relative
elshize May 17, 2017
70a58e6
Report overhead postings total/relative
elshize May 17, 2017
8e47fdf
Report overhead postings total/relative
elshize May 17, 2017
4afc536
Report overhead postings total/relative
elshize May 17, 2017
63e8446
Fix qid bug
elshize May 23, 2017
6cad482
Produce relevant document data for experiments
elshize Nov 17, 2017
9fe1735
Select by original rank instead of score
elshize Dec 30, 2017
e769fbd
Select by original rank instead of score
elshize Dec 30, 2017
7d46650
Select by original rank instead of score
elshize Dec 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion scripts/complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ def m(id):

data = pd.read_csv(args.input, sep=' ')
data.columns = ['query', 'gdocid', 'score', 'cscore']
data['gdocid'] = data['gdocid'].map(lambda docid: map[docid])
data['ldocid'] = data['gdocid']
data = data.sort_values(by=['query', 'cscore'], ascending=[True, False])
data['rank'] = data.groupby('query').cumcount()
data['rank'] = data.groupby('query').cumcount().astype(np.int32)
data['query'] = data['query'].rank(method='dense').subtract(1).astype(np.int32)

write('{}.complexresults'.format(args.output), data, compression='SNAPPY', write_index=False)
20 changes: 20 additions & 0 deletions scripts/copy-impacts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

input=$1
output=$2
shardCount=$3
column=$4

if [ -z "${input}" ]; then echo "You have to define input file (1)."; exit 1; fi;
if [ -z "${output}" ]; then echo "You have to define output file prefix (2)."; exit 1; fi;
if [ -z "${shardCount}" ]; then echo "You have to define shard count (3)."; exit 1; fi;
if [ -z "${column}" ]; then echo "You have to define column name (4)."; exit 1; fi;

for ((shard = 0; shard < ${shardCount}; shard++))
do
pdsql \
${input} \
-q "select query, shard, 0 as bucket, ${column} as impact from df0 where shard=${shard}" \
-o "${output}#${shard}.impacts" \
-d query=int32 shard=int32 bucket=int32
done
22 changes: 22 additions & 0 deletions scripts/duplicate-buckets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import argparse
import numpy as np
import pandas as pd
from fastparquet import ParquetFile, write

parser = argparse.ArgumentParser(description='Duplicate bucket 0 n times, producing n buckets', prog='duplicate-buckets')
parser.add_argument('input_prefix')
parser.add_argument('output_prefix')
parser.add_argument('shards', type=int)
parser.add_argument('buckets', type=int)
parser.add_argument('--decay-factor', '-f', type=double, default=1.0)
args = parser.parse_args()


for shard in range(args.shards):
input_df = ParquetFile("{}#{}.impacts".format(args.input_prefix, shard)).to_pandas()
output_dfs = [input_df.copy(deep=True) for bucket in range(args.buckets)]
for bucket, df in enumerate(output_dfs):
df['bucket'] = bucket
df['bucket'] = df['bucket'].astype(np.int32)
df['impact'] = df['impact'].multiply(args.decay_factor).astype(np.double)
write("{}#{}.impacts".format(args.output_prefix, shard), pd.concat(output_dfs), compression='SNAPPY', write_index=False)
17 changes: 17 additions & 0 deletions scripts/produce-relevant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import argparse
import pandas as pd
import fastparquet as fp

parser = argparse.ArgumentParser(description='Produce a parquet file with relevant documents for a given index.', prog='produce-relevant')
parser.add_argument('relevant_titles', help='A file that maps query ID to relevant documents titles.')
parser.add_argument('idmapping', help='A file that maps titles to the document IDs in the index.')
parser.add_argument('output', help='The output file with mapping from query ID to document ID.')
args = parser.parse_args()


relevant_titles = fp.ParquetFile(args.relevant_titles).to_pandas()
idmapping = fp.ParquetFile(args.idmapping).to_pandas()

relevant_ids = pd.merge(relevant_titles, idmapping, on='title', sort=True)
relevant_ids.drop('title', inplace=True, axis=1)
fp.write(args.output, relevant_ids, compression='SNAPPY')
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ object LabelResults extends LazyLogging {

val relevanceFilename = s"${features.basename}.relevance"
val relevantResults = if (new File(relevanceFilename).exists()) spark.read.parquet(relevanceFilename)
else Seq.empty[(Int, Long)].toDF("query", "gdocid")
else {
logger.warn(s"no relevant documents found: $relevanceFilename")
Seq.empty[(Int, Long)].toDF("query", "gdocid")
}

val complexFilename = s"${features.basename}.complexresutls"
val complexFilename = s"${features.basename}.complexresults"
val complexResults = if (new File(complexFilename).exists()) spark.read.parquet(complexFilename)
else Seq.empty[(Int, Long, Int)].toDF("query", "gdocid", "rank")
else {
logger.warn(s"no complex results found: $complexFilename")
Seq.empty[(Int, Long, Int)].toDF("query", "gdocid", "rank")
}

for (shard <- 0 until properties.shardCount) {

Expand Down
36 changes: 18 additions & 18 deletions src/main/scala/edu/nyu/tandon/search/selective/Run.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@ import edu.nyu.tandon.search.stat.TPaired
object Run {

val Programs = Seq[(String, Array[String] => Unit)](
(BucketizeResults.CommandName, BucketizeResults.main),
(ExportSelectedToTrec.CommandName, ExportSelectedToTrec.main),
//(BucketizeResults.CommandName, BucketizeResults.main),
//(ExportSelectedToTrec.CommandName, ExportSelectedToTrec.main),
(ResolvePayoffs.CommandName, ResolvePayoffs.main),
(ShardSelector.CommandName, ShardSelector.main),
(LearnPayoffs.CommandName, LearnPayoffs.main),
(TrainCosts.CommandName, TrainCosts.main),
(PredictPayoffs.CommandName, PredictPayoffs.main),
(PredictCosts.CommandName, PredictCosts.main),
(Overlap.CommandName, Overlap.main),
//(ShardSelector.CommandName, ShardSelector.main),
//(LearnPayoffs.CommandName, LearnPayoffs.main),
//(TrainCosts.CommandName, TrainCosts.main),
//(PredictPayoffs.CommandName, PredictPayoffs.main),
//(PredictCosts.CommandName, PredictCosts.main),
//(Overlap.CommandName, Overlap.main),
(Time2Cost.CommandName, Time2Cost.main),
(Selection2Time.CommandName, Selection2Time.main),
//(Selection2Time.CommandName, Selection2Time.main),
(Penalize.CommandName, Penalize.main),
(PrecisionOptimizer.CommandName, PrecisionOptimizer.main),
(Titles2Map.CommandName, Titles2Map.main),
(BudgetOptimizer.CommandName, BudgetOptimizer.main),
(ClairvoyantSelector.CommandName, ClairvoyantSelector.main),
(SmartSelector.CommandName, SmartSelector.main),
(TPaired.CommandName, TPaired.main),
(Precision.CommandName, Precision.main),
//(PrecisionOptimizer.CommandName, PrecisionOptimizer.main),
//(Titles2Map.CommandName, Titles2Map.main),
//(BudgetOptimizer.CommandName, BudgetOptimizer.main),
//(ClairvoyantSelector.CommandName, ClairvoyantSelector.main),
//(SmartSelector.CommandName, SmartSelector.main),
//(TPaired.CommandName, TPaired.main),
//(Precision.CommandName, Precision.main),
(VerboseSelector.CommandName, VerboseSelector.main),
(Status.CommandName, Status.main),
(QRels2Parquet.CommandName, QRels2Parquet.main),
//(Status.CommandName, Status.main),
//(QRels2Parquet.CommandName, QRels2Parquet.main),
(LabelResults.CommandName, LabelResults.main)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package edu.nyu.tandon.search.selective.verbose

import java.io.{BufferedWriter, File, FileWriter}
import java.io.{BufferedWriter, FileWriter}

import com.typesafe.scalalogging.LazyLogging
import edu.nyu.tandon.search.selective.data.Properties
import edu.nyu.tandon.search.selective.data.features.Features
import edu.nyu.tandon.search.selective.verbose.VerboseSelector.scoreOrdering
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.types.StructType
import edu.nyu.tandon.search.selective.verbose.VerboseSelector.baseRankOrdering
import org.apache.spark.sql.{Row, SparkSession}
import scopt.OptionParser

Expand All @@ -18,11 +16,11 @@ import scala.collection.mutable
* @author [email protected]
*/
class VerboseSelector(val shards: Seq[Shard],
top: mutable.PriorityQueue[Result] = new mutable.PriorityQueue[Result]()(scoreOrdering),
top: mutable.PriorityQueue[Result] = new mutable.PriorityQueue[Result]()(baseRankOrdering),
val lastSelectedShard: Int = -1,
val cost: Double = 0,
val postings: Long = 0,
maxTop: Int = 500,
val selectedShards: Int = 0,
scale: Int = 4) {

def topShards(n: Int): VerboseSelector = {
Expand All @@ -36,7 +34,7 @@ class VerboseSelector(val shards: Seq[Shard],
lastSelectedShard,
cost,
postings,
maxTop,
selectedShards,
scale
)
}
Expand All @@ -52,7 +50,7 @@ class VerboseSelector(val shards: Seq[Shard],

/* update queue */
top.enqueue(selected.results: _*)
top.enqueue(top.dequeueAll.take(maxTop): _*)
top.enqueue(top.dequeueAll.take(500): _*)

val selectedShardId = selected.shardId
Some(
Expand All @@ -63,7 +61,9 @@ class VerboseSelector(val shards: Seq[Shard],
top,
selectedShardId,
cost + selected.cost,
postings + selected.postings
postings + selected.postings,
if (shards(selectedShardId).numSelected == 0) selectedShards + 1
else selectedShards
)
)
}
Expand All @@ -72,8 +72,9 @@ class VerboseSelector(val shards: Seq[Shard],
def round(x: Double): Double = BigDecimal(x).setScale(scale, BigDecimal.RoundingMode.HALF_UP).toDouble

def precisionAt(k: Int): Double = round(top.clone().dequeueAll.take(k).count(_.relevant).toDouble / k)
def overlapAt(k: Int): Double = round(top.clone().dequeueAll.take(k).count(_.originalRank <= k).toDouble / k)
def complexRecall(k: Int): Double = round(top.clone().dequeueAll.count(_.complexRank <= k).toDouble / k)
def overlapAt(k: Int): Double = round(top.clone().dequeueAll.take(k).count(_.originalRank < k).toDouble / k)
def complexRecall(k: Int): Double = round(top.clone().dequeueAll.count(_.complexRank < k).toDouble / k)
def complexPrecisionAt(k: Int): Double = round(top.clone().dequeueAll.sortBy(_.complexRank).take(k).count(_.relevant).toDouble / k)

def numRelevantInLastSelected(): Int = {
assert(lastSelectedShard >= 0 && lastSelectedShard < shards.length, "no last selection to report")
Expand All @@ -92,15 +93,19 @@ class VerboseSelector(val shards: Seq[Shard],
lazy val totalPostings: Long = shards.map(_.postings).sum
lazy val postingsRelative: Double = round(postings.toDouble / totalPostings.toDouble)

def postings(overhead: Long): Long = postings + overhead * selectedShards
def postingsRelative(overhead: Long): Double = round(postings(overhead).toDouble / (totalPostings + shards.length.toLong * overhead).toDouble)

}

object VerboseSelector extends LazyLogging {

val CommandName = "verbose-select"

val scoreOrdering: Ordering[Result] = Ordering.by((result: Result) => result.score)
//val scoreOrdering: Ordering[Result] = Ordering.by((result: Result) => result.score)
val baseRankOrdering: Ordering[Result] = Ordering.by((result: Result) => -result.originalRank)

def selectors(basename: String, shardPenalty: Double, from: Int, to: Int): Iterator[VerboseSelector] = {
def selectors(basename: String, shardPenalty: Double, from: Int, to: Int, usePostingCosts: Boolean): Iterator[VerboseSelector] = {
val properties = Properties.get(basename)
val features = Features.get(properties)
val spark = SparkSession.builder().master("local").getOrCreate()
Expand Down Expand Up @@ -134,12 +139,6 @@ object VerboseSelector extends LazyLogging {
}))
}

//val costs =
// if (new File(s"basename#0.cost").exists())
// Some(for (shard <- 0 until properties.shardCount) yield
// spark.read.parquet(s"$basename#$shard.cost"))
// else None

val postingCosts = for (shard <- 0 until properties.shardCount) yield
spark.read.parquet(s"${features.basename}#$shard.postingcost-${properties.bucketCount}")
.select($"query", $"bucket", $"postingcost")
Expand Down Expand Up @@ -202,8 +201,10 @@ object VerboseSelector extends LazyLogging {
}
case None => 0.0
},
cost = 1.0 / properties.bucketCount,
qPostingCosts(bucket))
cost =
if (usePostingCosts) qPostingCosts(bucket)
else 1.0 / properties.bucketCount,
postings = qPostingCosts(bucket))
}
Shard(shard, buckets.toList)
}
Expand All @@ -212,52 +213,60 @@ object VerboseSelector extends LazyLogging {
}
}

def printHeader(precisions: Seq[Int], overlaps: Seq[Int], complexRecalls: Seq[Int])(writer: BufferedWriter): Unit = {
def printHeader(precisions: Seq[Int], overlaps: Seq[Int], complexRecalls: Seq[Int], complexPrecisions: Seq[Int], overheads: Seq[Long])(writer: BufferedWriter): Unit = {
writer.write(Seq(
"qid",
"step",
"cost",
"postings",
"postings_relative",
overheads.map(o => s"postings_o$o").mkString(","),
overheads.map(o => s"postings_relative_o$o").mkString(","),
precisions.map(p => s"P@$p").mkString(","),
overlaps.map(o => s"O@$o").mkString(","),
complexRecalls.map(c => s"$c-CR").mkString(","),
complexPrecisions.map(c => s"CP@$c").mkString(","),
"last_shard",
"last_bucket",
"last_cost",
"last_postings",
"last_impact",
"last#relevant",
overlaps.map(o => s"last#top_$o").mkString(",")
overlaps.map(o => s"last#top_$o").mkString(","),
"num_shards_selected"
).mkString(","))
writer.newLine()
writer.flush()
}

def processSelector(precisions: Seq[Int], overlaps: Seq[Int], complexRecalls: Seq[Int], maxShards: Int)
def processSelector(precisions: Seq[Int], overlaps: Seq[Int], complexRecalls: Seq[Int], complexPrecisions: Seq[Int], overheads: Seq[Long], maxShards: Int)
(qid: Int, selector: VerboseSelector, writer: BufferedWriter): Unit = {

@tailrec
def process(selector: VerboseSelector, step: Int = 1): Unit = {

logger.info(s"Selected [shard=${selector.lastSelectedShard}, bucket=${selector.lastSelectedBucket}, cost=${selector.lastSelectedCost}]")
//logger.info(s"Selected [shard=${selector.lastSelectedShard}, bucket=${selector.lastSelectedBucket}, cost=${selector.lastSelectedCost}]")

writer.write(Seq(
qid,
step,
selector.cost,
selector.postings,
selector.postingsRelative,
overheads.map(selector.postings(_)).mkString(","),
overheads.map(selector.postingsRelative(_)).mkString(","),
precisions.map(selector.precisionAt).mkString(","),
overlaps.map(selector.overlapAt).mkString(","),
complexRecalls.map(selector.complexRecall).mkString(","),
complexPrecisions.map(selector.complexPrecisionAt).mkString(","),
selector.lastSelectedShard,
selector.lastSelectedBucket,
selector.lastSelectedCost,
selector.lastSelectedPostings,
selector.lastSelectedImpact,
selector.numRelevantInLastSelected(),
overlaps.map(selector.numTopInLastSelected).mkString(",")
overlaps.map(selector.numTopInLastSelected).mkString(","),
selector.selectedShards
).mkString(","))

writer.newLine()
Expand All @@ -279,9 +288,12 @@ object VerboseSelector extends LazyLogging {
precisions: Seq[Int] = Seq(10, 30),
overlaps: Seq[Int] = Seq(10, 30),
complexRecalls: Seq[Int] = Seq(10, 30),
complexPrecisions: Seq[Int] = Seq(10, 30),
overheads: Seq[Long] = Seq(10000, 50000, 100000, 500000, 1000000),
maxShards: Int = Int.MaxValue,
shardPenalty: Double = 0.0,
batchSize: Int = 50)
batchSize: Int = 200,
usePostingCosts: Boolean = false)

val parser = new OptionParser[Config](CommandName) {

Expand All @@ -299,9 +311,13 @@ object VerboseSelector extends LazyLogging {
.text("k for which to compute O@k")

opt[Seq[Int]]('c', "complex-recalls")
.action((x, c) => c.copy(overlaps = x))
.action((x, c) => c.copy(complexRecalls = x))
.text("k for which to compute k-CR")

opt[Seq[Int]]('C', "complex-precisions")
.action((x, c) => c.copy(complexPrecisions = x))
.text("k for which to compute CP@k")

opt[Double]('P', "penalty")
.action((x, c) => c.copy(shardPenalty = x))
.text("shard penalty")
Expand All @@ -314,6 +330,10 @@ object VerboseSelector extends LazyLogging {
.action((x, c) => c.copy(batchSize = x))
.text("how many queries to run at once in memory")

opt[Boolean]('u', "use-posting-costs")
.action((x, c) => c.copy(usePostingCosts = x))
.text("use posting costs instead of fixed uniform costs")

}

parser.parse(args, Config()) match {
Expand All @@ -332,17 +352,16 @@ object VerboseSelector extends LazyLogging {
.map(a => (a.head, a.last + 1))

val writer = new BufferedWriter(new FileWriter(s"${config.basename}.verbose"))
printHeader(config.precisions, config.overlaps, config.complexRecalls, config.complexPrecisions, config.overheads)(writer)

for ((from, to) <- queries) {

logger.info("creating selectors")
val selectorsForQueries = selectors(config.basename, config.shardPenalty, from, to)

printHeader(config.precisions, config.overlaps, config.complexRecalls)(writer)
logger.info(s"processing batch [$from, $to]")
val selectorsForQueries = selectors(config.basename, config.shardPenalty, from, to, config.usePostingCosts)

for ((selector, idx) <- selectorsForQueries.zipWithIndex) {
logger.info(s"processing query $idx")
processSelector(config.precisions, config.overlaps, config.complexRecalls, config.maxShards)(idx, selector, writer)
logger.info(s"processing query ${idx + from}")
processSelector(config.precisions, config.overlaps, config.complexRecalls, config.complexPrecisions, config.overheads, config.maxShards)(idx + from, selector, writer)
}
}

Expand Down
Loading