diff --git a/src/main/scala/com/cloudant/clouseau/ClouseauTypeFactory.scala b/src/main/scala/com/cloudant/clouseau/ClouseauTypeFactory.scala index 4508fa4a..75cc1bb7 100644 --- a/src/main/scala/com/cloudant/clouseau/ClouseauTypeFactory.scala +++ b/src/main/scala/com/cloudant/clouseau/ClouseauTypeFactory.scala @@ -41,6 +41,7 @@ case class UpdateDocMsg(id: String, doc: Document) case class DeleteDocMsg(id: String) case class CommitMsg(seq: Long) case class SetUpdateSeqMsg(seq: Long) +case class SetPurgeSeqMsg(seq: Long) object ClouseauTypeFactory extends TypeFactory { @@ -89,6 +90,8 @@ object ClouseauTypeFactory extends TypeFactory { Some(CommitMsg(toLong(reader.readTerm))) case ('set_update_seq, 2) => Some(SetUpdateSeqMsg(toLong(reader.readTerm))) + case ('set_purge_seq, 2) => + Some(SetPurgeSeqMsg(toLong(reader.readTerm))) case _ => None } diff --git a/src/main/scala/com/cloudant/clouseau/IndexService.scala b/src/main/scala/com/cloudant/clouseau/IndexService.scala index be6e590b..ba3424ab 100644 --- a/src/main/scala/com/cloudant/clouseau/IndexService.scala +++ b/src/main/scala/com/cloudant/clouseau/IndexService.scala @@ -70,6 +70,8 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w var reader = DirectoryReader.open(ctx.args.writer, true) var updateSeq = getCommittedSeq var pendingSeq = updateSeq + var purgeSeq = getCommittedPurgeSeq + var pendingPurgeSeq = purgeSeq var committing = false var forceRefresh = false @@ -97,6 +99,8 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w group2(request) case 'get_update_seq => ('ok, updateSeq) + case 'get_purge_seq => + ('ok, purgeSeq) case UpdateDocMsg(id: String, doc: Document) => logger.debug("Updating %s".format(id)) updateTimer.time { @@ -117,6 +121,10 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w pendingSeq = newSeq logger.debug("Pending sequence is now %d".format(newSeq)) 'ok + case SetPurgeSeqMsg(newPurgeSeq: Long) => + pendingPurgeSeq = newPurgeSeq + logger.debug("purge sequence is now %d".format(newPurgeSeq)) + 'ok case 'info => ('ok, getInfo) } @@ -148,12 +156,13 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w } exit('deleted) case 'maybe_commit => - commit(pendingSeq) - case ('committed, newSeq: Long) => - updateSeq = newSeq + commit(pendingSeq, pendingPurgeSeq) + case ('committed, newUpdateSeq: Long, newPurgeSeq: Long) => + updateSeq = newUpdateSeq + purgeSeq = newPurgeSeq forceRefresh = true committing = false - logger.info("Committed sequence %d".format(newSeq)) + logger.debug("Committed update sequence %d and purge sequence %d".format(newUpdateSeq, newPurgeSeq)) case 'commit_failed => committing = false } @@ -175,18 +184,19 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w } } - private def commit(newSeq: Long) { - if (!committing && newSeq > updateSeq) { + private def commit(newUpdateSeq: Long, newPurgeSeq: Long) { + if (!committing && (newUpdateSeq > updateSeq || newPurgeSeq > purgeSeq)) { committing = true val index = self node.spawn((_) => { ctx.args.writer.setCommitData(ctx.args.writer.getCommitData + - ("update_seq" -> newSeq.toString)) + ("update_seq" -> newUpdateSeq.toString) + + ("purge_seq" -> newPurgeSeq.toString)) try { commitTimer.time { ctx.args.writer.commit() } - index ! ('committed, newSeq) + index ! ('committed, newUpdateSeq, newPurgeSeq) } catch { case e: AlreadyClosedException => logger.error("Commit failed to closed writer", e) @@ -569,7 +579,8 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w ('doc_count, reader.numDocs), ('doc_del_count, reader.numDeletedDocs), ('pending_seq, pendingSeq), - ('committed_seq, getCommittedSeq) + ('committed_seq, getCommittedSeq), + ('purge_seq, purgeSeq) ) } @@ -590,6 +601,16 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w } } + private def getCommittedPurgeSeq = { + val commitData = ctx.args.writer.getCommitData + commitData.get("purge_seq") match { + case null => + 0L + case seq => + seq.toLong + } + } + private def parseSort(v: Any): Sort = v match { case 'relevance => Sort.RELEVANCE