Skip to content

Commit

Permalink
Fix duplicate logs case (#8)
Browse files Browse the repository at this point in the history
* Add support for log type

* Add support for log type

* Add support for log type

* Fix duplicate logs case

* remove temp.txt
  • Loading branch information
yotamloe authored Sep 29, 2021
1 parent 60d08ae commit 158e86d
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 98 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
*.orig
gtoggl/gtoggl
test

*.txt
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm

## Directory-based project format
Expand Down
8 changes: 1 addition & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,9 @@ module github.com/logzio/logzio-go
go 1.15

require (
github.com/StackExchange/wmi v1.2.0 // indirect
github.com/beeker1121/goque v2.1.0+incompatible
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/shirou/gopsutil v0.0.0-20190323131628-2cbc9195c892 // indirect
github.com/shirou/gopsutil/v3 v3.21.6
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tidwall/gjson v1.8.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.1.7
go.uber.org/atomic v1.9.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
)
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v1.2.0 h1:noJEYkMQVlFCEAc+2ma5YyRhlfjcWfZqk5sBRYozdyM=
github.com/StackExchange/wmi v1.2.0/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/beeker1121/goque v2.1.0+incompatible h1:m5pZ5b8nqzojS2DF2ioZphFYQUqGYsDORq6uefUItPM=
Expand All @@ -20,6 +16,7 @@ github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down
Binary file removed inMemoryQueue/inMemoryQueue.test
Binary file not shown.
101 changes: 27 additions & 74 deletions logsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// Utils
const (
defaultQueueSize = 40 * 1024 * 1024 // 3mb
defaultQueueSize = 40 * 1024 * 1024
)

// In memory queue tests
Expand Down Expand Up @@ -331,13 +331,13 @@ func TestLogzioSender_DequeueUpToMaxBatchSize(t *testing.T) {
for i := 0; i < 100; i++ {
l.Send(make([]byte, 33000))
}
bufSize := l.dequeueUpToMaxBatchSize()
l.dequeueUpToMaxBatchSize()
item, err := l.queue.Dequeue()
if item == nil {
t.Fatalf("Queue not suposed to bee empty")
}
if uint64(bufSize) > 3*1024*1024 {
t.Fatalf("%d > %d", bufSize, 3*1024*1024)
if uint64(len(l.buf.Bytes())) > 3*1024*1024 {
t.Fatalf("%d > %d", len(l.buf.Bytes()), 3*1024*1024)
}

l.Stop()
Expand Down Expand Up @@ -707,73 +707,26 @@ func BenchmarkLogzioSenderInmemory(b *testing.B) {
}
}

////E2E test
//func TestLogzioSender_E2E(t *testing.T) {
// l, err := New("",
// SetInMemoryQueue(true),
// SetDrainDuration(time.Second*5),
// SetDebug(os.Stderr),
// )
// if err != nil {
// panic(err)
// }
// randomString := fmt.Sprint(rand.Int())
// msg := fmt.Sprintf("{ \"%s\": \"%s\"}", "message", randomString)
// l.debugLog("Sending 500 logs...\n")
// for i := 0; i < 500; i++ {
// err := l.Send([]byte(msg))
// if err != nil {
// panic(err)
// }
// }
// <-time.After(l.drainDuration)
//
// apiQuery := `{
// "query": {
// "bool": {
// "must": [{
// "query_string": {
// "query": ""
// }
// },
// {
// "range": {
// "@timestamp": {
// "gte": "now-5m",
// "lte": "now"
// }
// }
// }
// ]
// }
// },
// "size": 1000,
// "from": 0
// }`
//
// url := "https://api.logz.io/v1/search"
// queryString := fmt.Sprintf("message:%s", randomString)
// query, _ := sjson.Set(apiQuery, "query.bool.must.0.query_string.query", queryString)
// var jsonStr = []byte(query)
//
// l.debugLog("Waiting 40 seconds for ingestion\n")
// time.Sleep(time.Second * 40)
//
// fmt.Println("URL:>", url)
// req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
// req.Header.Set("X-API-TOKEN", "6c94fd02-8e34-4f0c-bc00-9a42e35171fc")
// req.Header.Set("Content-Type", "application/json")
//
// client := &http.Client{}
// resp, err := client.Do(req)
// if err != nil {
// panic(err)
// }
// defer resp.Body.Close()
//
// fmt.Println("response Status:", resp.Status)
// fmt.Println("response Headers:", resp.Header)
// body, _ := ioutil.ReadAll(resp.Body)
// fmt.Println("response Body:", string(body))
// l.Stop() //logs are buffered on disk. Stop will drain the buffer
//}
//E2E test
func TestLogzioSender_E2E(t *testing.T) {
l, err := New("",
SetInMemoryQueue(true),
SetUrl("https://listener.logz.io:8071"),
SetDrainDuration(time.Second*5),
SetinMemoryCapacity(300*1024*1024),
SetlogCountLimit(2000000),
SetDebug(os.Stderr),
)
if err != nil {
panic(err)
}
msg := `{"traceID":"0000000000000001","operationName":"o3","spanID":"2a3ad4a54c048830","references":[],"startTime":1632401226891238,"startTimeMillis":1632401226891,"duration":0,"logs":[],"process":{"serviceName":"testService","tags":[]},"type":"jaegerSpan"}`
for i := 0; i < 10000; i++ {
err := l.Send([]byte(msg))
if err != nil {
panic(err)
}
}
time.Sleep(time.Second * 40)
l.Stop() //logs are buffered on disk. Stop will drain the buffer
}
26 changes: 13 additions & 13 deletions logziosender.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (l *LogzioSender) makeHttpRequest(data bytes.Buffer, attempt int, c bool) i
if c {
req.Header.Add("Content-Encoding", "gzip")
}
l.debugLog("logziosender.go: Sending bulk of %v bytes\n", l.buf.Len())
resp, err := l.httpClient.Do(req)
if err != nil {
//l.debugLog("logziosender.go: Error sending logs to %s %s\n", l.url, err)
Expand All @@ -303,6 +304,7 @@ func (l *LogzioSender) makeHttpRequest(data bytes.Buffer, attempt int, c bool) i
if err != nil {
l.debugLog("Error reading response body: %v", err)
}
l.debugLog("logziosender.go: Response status code: %v \n", statusCode)
if statusCode == 200 {
l.droppedLogs = 0
}
Expand Down Expand Up @@ -354,18 +356,18 @@ func (l *LogzioSender) shouldRetry(statusCode int) bool {
func (l *LogzioSender) Drain() {
if l.draining.Load() {
l.debugLog("logziosender.go: Already draining\n")
return
}
l.mux.Lock()
l.debugLog("logziosender.go: draining queue\n")
defer l.mux.Unlock()
l.draining.Toggle()
defer l.draining.Toggle()

l.buf.Reset()
var reDrain bool = true
var reDrain = true
for l.queue.Length() > 0 && reDrain {
bufSize := l.dequeueUpToMaxBatchSize()
if bufSize > 0 {
l.buf.Reset()
l.dequeueUpToMaxBatchSize()
if len(l.buf.Bytes()) > 0 {
backOff := sendSleepingBackoff
toBackOff := false
for attempt := 0; attempt < sendRetries; attempt++ {
Expand All @@ -391,32 +393,30 @@ func (l *LogzioSender) Drain() {

}

func (l *LogzioSender) dequeueUpToMaxBatchSize() int {
func (l *LogzioSender) dequeueUpToMaxBatchSize() {
var (
bufSize int
err error
err error
)
for bufSize < maxSize && err == nil {
for l.buf.Len() < maxSize && err == nil {
item, err := l.queue.Dequeue()
if err != nil {
l.debugLog("queue state: %s\n", err)
}
if item != nil {
// NewLine is appended tp item.Value
if len(item.Value)+bufSize+1 > maxSize {
if len(item.Value)+l.buf.Len()+1 >= maxSize {
l.queue.Enqueue(item.Value)
break
}
bufSize += len(item.Value)
l.debugLog("logziosender.go: Adding item with size %d (total buffSize: %d)\n", len(item.Value), bufSize)
_, err := l.buf.Write(append(item.Value, '\n'))
//l.debugLog("logziosender.go: Adding item with size %d (total buffSize: %d)\n", len(item.Value), l.buf.Len())
if err != nil {
l.errorLog("error writing to buffer %s", err)
}
} else {
break
}
}
return bufSize
}

// Sync drains the queue
Expand Down

0 comments on commit 158e86d

Please sign in to comment.