Skip to content

Commit

Permalink
all: remove gob and SSE rpc endpoints
Browse files Browse the repository at this point in the history
We now only consume zoekt via gRPC at Sourcegraph and I doubt anyone
uses the old endpoints.

This will have one required update in sourcegraph, and that is to use
SenderFunc from the main zoekt package rather than from the now deleted
stream package.

Test Plan: go test
  • Loading branch information
keegancsmith committed Apr 18, 2024
1 parent 74e75ef commit dc59e85
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 1,047 deletions.
9 changes: 9 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,15 @@ type Sender interface {
Send(*SearchResult)
}

// SenderFunc is an adapter to allow the use of ordinary functions as Sender.
// If f is a function with the appropriate signature, SenderFunc(f) is a Sender
// that calls f.
type SenderFunc func(result *SearchResult)

func (f SenderFunc) Send(result *SearchResult) {
f(result)
}

// Streamer adds the method StreamSearch to the Searcher interface.
type Streamer interface {
Searcher
Expand Down
61 changes: 61 additions & 0 deletions cmd/zoekt-webserver/grpc/server/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package server

import (
"math"

"github.com/sourcegraph/zoekt"
)

// newSamplingSender is a zoekt.Sender that samples stats events to avoid
// sending many empty stats events over the wire.
func newSamplingSender(next zoekt.Sender) *samplingSender {
return &samplingSender{next: next}
}

type samplingSender struct {
next zoekt.Sender
agg zoekt.SearchResult
aggCount int
}

func (s *samplingSender) Send(event *zoekt.SearchResult) {
// We don't want to send events over the wire if they don't contain file
// matches. Hence, in case we didn't find any results, we aggregate the stats
// and send them out in regular intervals.
if len(event.Files) == 0 {
s.aggCount++

s.agg.Stats.Add(event.Stats)
s.agg.Progress = event.Progress

if s.aggCount%100 == 0 && !s.agg.Stats.Zero() {
s.next.Send(&s.agg)
s.agg = zoekt.SearchResult{}
}

return
}

// If we have aggregate stats, we merge them with the new event before sending
// it. We drop agg.Progress, because we assume that event.Progress reflects the
// latest status.
if !s.agg.Stats.Zero() {
event.Stats.Add(s.agg.Stats)
s.agg = zoekt.SearchResult{}
}

s.next.Send(event)
}

// Flush sends any aggregated stats that we haven't sent yet
func (s *samplingSender) Flush() {
if !s.agg.Stats.Zero() {
s.next.Send(&zoekt.SearchResult{
Stats: s.agg.Stats,
Progress: zoekt.Progress{
Priority: math.Inf(-1),
MaxPendingPriority: math.Inf(-1),
},
})
}
}
72 changes: 72 additions & 0 deletions cmd/zoekt-webserver/grpc/server/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package server

import (
"testing"

"github.com/sourcegraph/zoekt"
)

func TestSamplingStream(t *testing.T) {
nonZeroStats := zoekt.Stats{
ContentBytesLoaded: 10,
}
filesEvent := &zoekt.SearchResult{
Files: make([]zoekt.FileMatch, 10),
Stats: nonZeroStats,
}
fileEvents := func(n int) []*zoekt.SearchResult {
res := make([]*zoekt.SearchResult, n)
for i := 0; i < n; i++ {
res[i] = filesEvent
}
return res
}
statsEvent := &zoekt.SearchResult{
Stats: nonZeroStats,
}
statsEvents := func(n int) []*zoekt.SearchResult {
res := make([]*zoekt.SearchResult, n)
for i := 0; i < n; i++ {
res[i] = statsEvent
}
return res
}
cases := []struct {
events []*zoekt.SearchResult
beforeFlushCount int
afterFlushCount int
}{
// These test cases assume that the sampler only forwards
// every 100 stats-only event. In case the sampling logic
// changes, these tests are not valuable.
{nil, 0, 0},
{fileEvents(1), 1, 1},
{fileEvents(2), 2, 2},
{fileEvents(200), 200, 200},
{append(fileEvents(1), statsEvents(1)...), 1, 2},
{append(fileEvents(1), statsEvents(2)...), 1, 2},
{append(fileEvents(1), statsEvents(99)...), 1, 2},
{append(fileEvents(1), statsEvents(100)...), 2, 2},
{statsEvents(500), 5, 5},
{statsEvents(501), 5, 6},
}

for _, tc := range cases {
count := 0
ss := newSamplingSender(zoekt.SenderFunc(func(*zoekt.SearchResult) {
count += 1
}))

for _, event := range tc.events {
ss.Send(event)
}
if count != tc.beforeFlushCount {
t.Fatalf("expected %d events, got %d", tc.beforeFlushCount, count)
}
ss.Flush()

if count != tc.afterFlushCount {
t.Fatalf("expected %d events, got %d", tc.afterFlushCount, count)
}
}
}
5 changes: 2 additions & 3 deletions cmd/zoekt-webserver/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/stream"
)

func NewServer(s zoekt.Streamer) *Server {
Expand Down Expand Up @@ -48,7 +47,7 @@ func (s *Server) StreamSearch(req *proto.StreamSearchRequest, ss proto.Webserver
}

sender := gRPCChunkSender(ss)
sampler := stream.NewSamplingSender(sender)
sampler := newSamplingSender(sender)

err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(request.GetOpts()), sampler)
if err == nil {
Expand Down Expand Up @@ -125,5 +124,5 @@ func gRPCChunkSender(ss proto.WebserverService_StreamSearchServer) zoekt.Sender
_ = chunk.SendAll(sendFunc, result.GetFiles()...)
}

return stream.SenderFunc(f)
return zoekt.SenderFunc(f)
}
3 changes: 1 addition & 2 deletions cmd/zoekt-webserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/sourcegraph/zoekt/internal/tracer"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/shards"
"github.com/sourcegraph/zoekt/stream"
"github.com/sourcegraph/zoekt/trace"
"github.com/sourcegraph/zoekt/web"

Expand Down Expand Up @@ -554,7 +553,7 @@ func (s *loggedSearcher) StreamSearch(
var stats zoekt.Stats

metricSearchRequestsTotal.Inc()
err := s.Streamer.StreamSearch(ctx, q, opts, stream.SenderFunc(func(event *zoekt.SearchResult) {
err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
stats.Add(event.Stats)
sender.Send(event)
}))
Expand Down
64 changes: 0 additions & 64 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package query

import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"log"
Expand All @@ -25,7 +23,6 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/RoaringBitmap/roaring"
"github.com/grafana/regexp"
Expand All @@ -39,17 +36,6 @@ type Q interface {
String() string
}

// RPCUnwrap processes q to remove RPC specific elements from q. This is
// needed because gob isn't flexible enough for us. This should be called by
// RPC servers at the client/server boundary so that q works with the rest of
// zoekt.
func RPCUnwrap(q Q) Q {
if cache, ok := q.(*GobCache); ok {
return cache.Q
}
return q
}

// RawConfig filters repositories based on their encoded RawConfig map.
type RawConfig uint64

Expand Down Expand Up @@ -462,56 +448,6 @@ func (q *Regexp) setCase(k string) {
}
}

// GobCache exists so we only pay the cost of marshalling a query once when we
// aggregate it out over all the replicas.
//
// Our query and eval layer do not support GobCache. Instead, at the gob
// boundaries (RPC and Streaming) we check if the Q is a GobCache and unwrap
// it.
//
// "I wish we could get rid of this code soon enough" - tomas
type GobCache struct {
Q

once sync.Once
data []byte
err error
}

// GobEncode implements gob.Encoder.
func (q *GobCache) GobEncode() ([]byte, error) {
q.once.Do(func() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
q.err = enc.Encode(&gobWrapper{
WrappedQ: q.Q,
})
q.data = buf.Bytes()
})
return q.data, q.err
}

// GobDecode implements gob.Decoder.
func (q *GobCache) GobDecode(data []byte) error {
dec := gob.NewDecoder(bytes.NewBuffer(data))
var w gobWrapper
err := dec.Decode(&w)
if err != nil {
return err
}
q.Q = w.WrappedQ
return nil
}

// gobWrapper is needed so the gob decoder works.
type gobWrapper struct {
WrappedQ Q
}

func (q *GobCache) String() string {
return fmt.Sprintf("GobCache(%s)", q.Q)
}

// Or is matched when any of its children is matched.
type Or struct {
Children []Q
Expand Down
1 change: 0 additions & 1 deletion query/query_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func QToProto(q Q) *proto.Q {
return &proto.Q{Query: &proto.Q_Boost{Boost: v.ToProto()}}
default:
// The following nodes do not have a proto representation:
// - GobCache: only needed for Gob encoding
// - caseQ: only used internally, not by the RPC layer
panic(fmt.Sprintf("unknown query node %T", v))
}
Expand Down
71 changes: 0 additions & 71 deletions rpc/internal/srv/srv.go

This file was deleted.

Loading

0 comments on commit dc59e85

Please sign in to comment.