Skip to content

Commit

Permalink
v1.0.5 (#11)
Browse files Browse the repository at this point in the history
* Add info logs + update dep (goleveldb)

* Change `token` query parameter to optional for generic use

* Add tests for url setting

* revert goleveldb version update

* update Author git link
  • Loading branch information
yotamloe authored May 12, 2022
1 parent e116f01 commit 60f9e47
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 47 deletions.
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ func main() {
## Data compression
All bulks are compressed with gzip by default to disable compressing initialize the client with `SetCompress(false)`:
```go
logzio.New(token, SetCompress(false),
logzio.New(token,
SetCompress(false),
)
```

## Tests

```shell
$ go test -v

```


Expand All @@ -142,9 +142,12 @@ $ go test -v

## Authors

* **Douglas Chimento** - [dougEfresh][me]
* **Douglas Chimento** - [dougEfresh](https://github.com/dougEfresh)
* **Ido Halevi** - [idohalevi](https://github.com/idohalevi)

## Maintainers
* **Yotam Loewenbach** - [yotamloe](https://github.com/yotamloe)


## License

Expand All @@ -155,10 +158,18 @@ This project is licensed under the Apache License - see the [LICENSE](LICENSE) f
* [logzio-java-sender](https://github.com/logzio/logzio-java-sender)


## Changelog
## Changelog
- v1.0.5
- Change `token` query parameter to optional for generic use
- Changed logging levels
- v1.0.4
- Update gopsutil version (v3.21.6 -> v3.22.3)
- v1.0.3
- Adjust buffer clearance
- Changed logging format
- v1.0.2
- Update dependencies
- v1.0.1
- Add gzip compression
- Add option for in Memory queue

- v1.0.2
- Update dependencies

55 changes: 36 additions & 19 deletions logsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,37 @@ const (
defaultQueueSize = 40 * 1024 * 1024
)

func TestLogzioSender_SetUrl(t *testing.T) {
l, err := New(
"",
SetDebug(os.Stderr),
SetUrl("http://localhost:12345"),
SetInMemoryQueue(true),
SetinMemoryCapacity(500),
SetDrainDuration(time.Minute),
)
if err != nil {
t.Fatal(err)
}
if l.url != "http://localhost:12345" {
t.Fatalf("url should be http://localhost:12345, actual: %s", l.url)
}
l2, err := New(
"token",
SetDebug(os.Stderr),
SetUrl("http://localhost:12345"),
SetInMemoryQueue(true),
SetinMemoryCapacity(500),
SetDrainDuration(time.Minute),
)
if err != nil {
t.Fatal(err)
}
if l2.url != "http://localhost:12345/?token=token" {
t.Fatalf("url should be http://localhost:12345/?token=token, actual: %s", l.url)
}
}

// In memory queue tests
func TestLogzioSender_inMemoryRetries(t *testing.T) {
var sent = make([]byte, 1024)
Expand Down Expand Up @@ -93,9 +124,7 @@ func TestLogzioSender_InMemoryCapacityLimit(t *testing.T) {

func TestLogzioSender_InMemorySend(t *testing.T) {
var sent = make([]byte, 1024)
var sentToken string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sentToken = r.URL.Query().Get("token")
w.WriteHeader(http.StatusOK)
r.Body.Read(sent)
}))
Expand All @@ -117,9 +146,6 @@ func TestLogzioSender_InMemorySend(t *testing.T) {
}
l.Drain()
time.Sleep(200 * time.Millisecond)
if sentToken != "fake-token" {
t.Fatalf("token not sent %s", sentToken)
}
item, err := l.queue.Dequeue()
if item != nil {
t.Fatalf("Unexpect item in the queue - %s", string(item.Value))
Expand All @@ -129,9 +155,7 @@ func TestLogzioSender_InMemorySend(t *testing.T) {

func TestLogzioSender_InMemoryDrain(t *testing.T) {
var sent = make([]byte, 1024)
var sentToken string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sentToken = r.URL.Query().Get("token")
w.WriteHeader(http.StatusOK)
r.Body.Read(sent)
}))
Expand All @@ -152,9 +176,6 @@ func TestLogzioSender_InMemoryDrain(t *testing.T) {
}
l.Drain()
time.Sleep(time.Second * 10)
if sentToken != "fake-token" {
t.Fatalf("token not sent %s", sentToken)
}
item, err := l.queue.Dequeue()
if item != nil {
t.Fatalf("Unexpect item in the queue - %s", string(item.Value))
Expand Down Expand Up @@ -378,9 +399,7 @@ func TestLogzioSender_Retries(t *testing.T) {

func TestLogzioSender_Send(t *testing.T) {
var sent = make([]byte, 1024)
var sentToken string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sentToken = r.URL.Query().Get("token")
w.WriteHeader(http.StatusOK)
r.Body.Read(sent)
}))
Expand All @@ -402,9 +421,7 @@ func TestLogzioSender_Send(t *testing.T) {
if sentMsg != "blah\n" {
t.Fatalf("%s != %s ", sent, sentMsg)
}
if sentToken != "fake-token" {
t.Fatalf("token not sent %s", sentToken)
}

}

func TestLogzioSender_DelayStart(t *testing.T) {
Expand Down Expand Up @@ -614,7 +631,7 @@ func TestLogzioSender_CountDropped(t *testing.T) {
if l.droppedLogs != 3 {
t.Fatalf("items should have been dropped")
}
l.diskThreshold = 95
l.diskThreshold = 98
l.Send([]byte("blah"))
l.Send([]byte("blah"))
l.Drain()
Expand Down Expand Up @@ -718,13 +735,13 @@ func TestLogzioSender_E2E(t *testing.T) {
SetDebug(os.Stderr),
)
if err != nil {
panic(err)
t.Fatal(err)
}
msg := `{"traceID":"0000000000000001","operationName":"o3","spanID":"2a3ad4a54c048830","references":[],"startTime":1632401226891238,"startTimeMillis":1632401226891,"duration":0,"logs":[],"process":{"serviceName":"testService","tags":[]},"type":"jaegerSpan"}`
for i := 0; i < 10000; i++ {
err := l.Send([]byte(msg))
err = l.Send([]byte(msg))
if err != nil {
panic(err)
t.Fatal(err)
}
}
time.Sleep(time.Second * 40)
Expand Down
48 changes: 27 additions & 21 deletions logziosender.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ func SetTempDirectory(dir string) SenderOptionFunc {
// SetUrl set the url which maybe different from the defaultUrl
func SetUrl(url string) SenderOptionFunc {
return func(l *LogzioSender) error {
l.url = fmt.Sprintf("%s/?token=%s", url, l.token)
l.debugLog("logziosender.go: Setting url to %s\n", l.url)
l.url = url
if l.token != "" {
l.url = fmt.Sprintf("%s/?token=%s", url, l.token)
}
l.debugLog("sender: Setting url to %s\n", l.url)
return nil
}
}
Expand Down Expand Up @@ -219,14 +222,14 @@ func (l *LogzioSender) isEnoughDiskSpace() bool {
if l.checkDiskSpace {
diskStat, err := disk.Usage(l.dir)
if err != nil {
l.debugLog("logziosender.go: failed to get disk usage: %v\n", err)
l.debugLog("sender: failed to get disk usage: %v\n", err)
l.checkDiskSpace = false
return false
}

usage := float32(diskStat.UsedPercent)
if usage > l.diskThreshold {
l.debugLog("Logz.io: Dropping logs, as FS used space on %s is %g percent,"+
l.debugLog("sender: Dropping logs, as FS used space on %s is %g percent,"+
" and the drop threshold is %g percent\n",
l.dir, usage, l.diskThreshold)
l.droppedLogs++
Expand All @@ -242,7 +245,7 @@ func (l *LogzioSender) isEnoughDiskSpace() bool {
func (l *LogzioSender) isEnoughMemory(dataSize uint64) bool {
usage := l.queue.Length()
if usage+dataSize >= l.inMemoryCapacity {
l.debugLog("Logz.io: Dropping logs, the max capacity is %d and %d is requested, Request size: %d\n", l.inMemoryCapacity, usage+dataSize, dataSize)
l.infoLog("sender: Dropping logs, the max capacity is %d and %d is requested, Request size: %d\n", l.inMemoryCapacity, usage+dataSize, dataSize)
l.droppedLogs++
return false
} else {
Expand Down Expand Up @@ -291,20 +294,20 @@ func (l *LogzioSender) makeHttpRequest(data bytes.Buffer, attempt int, c bool) i
if c {
req.Header.Add("Content-Encoding", "gzip")
}
l.debugLog("logziosender.go: Sending bulk of %v bytes\n", l.buf.Len())
l.infoLog("sender: Sending bulk of %v bytes\n", l.buf.Len())
resp, err := l.httpClient.Do(req)
if err != nil {
//l.debugLog("logziosender.go: Error sending logs to %s %s\n", l.url, err)
l.infoLog("sender: Error sending logs to %s %s\n", l.url, err)
return httpError
}

defer resp.Body.Close()
statusCode := resp.StatusCode
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
l.debugLog("Error reading response body: %v", err)
l.infoLog("sender: Error reading response body: %v", err)
}
l.debugLog("logziosender.go: Response status code: %v \n", statusCode)
l.infoLog("sender: Response status code: %v \n", statusCode)
if statusCode == 200 {
l.droppedLogs = 0
}
Expand Down Expand Up @@ -335,16 +338,16 @@ func (l *LogzioSender) shouldRetry(statusCode int) bool {
retry := true
switch statusCode {
case http.StatusBadRequest:
l.debugLog("Got HTTP %d bad request, skip retry\n", statusCode)
l.infoLog("sender: Got HTTP %d bad request, skip retry\n", statusCode)
retry = false
case http.StatusNotFound:
l.debugLog("Got HTTP %d not found, skip retry\n", statusCode)
l.infoLog("sender: Got HTTP %d not found, skip retry\n", statusCode)
retry = false
case http.StatusUnauthorized:
l.debugLog("Got HTTP %d unauthorized, skip retry\n", statusCode)
l.infoLog("sender: Got HTTP %d unauthorized, skip retry\n", statusCode)
retry = false
case http.StatusForbidden:
l.debugLog("Got HTTP %d forbidden, skip retry\n", statusCode)
l.infoLog("sender: Got HTTP %d forbidden, skip retry\n", statusCode)
retry = false
case http.StatusOK:
retry = false
Expand All @@ -355,11 +358,11 @@ func (l *LogzioSender) shouldRetry(statusCode int) bool {
// Drain - Send remaining logs
func (l *LogzioSender) Drain() {
if l.draining.Load() {
l.debugLog("logziosender.go: Already draining\n")
l.debugLog("sender: Already draining\n")
return
}
l.mux.Lock()
l.debugLog("logziosender.go: draining queue\n")
l.debugLog("sender: draining queue\n")
defer l.mux.Unlock()
l.draining.Toggle()
defer l.draining.Toggle()
Expand All @@ -372,7 +375,7 @@ func (l *LogzioSender) Drain() {
toBackOff := false
for attempt := 0; attempt < sendRetries; attempt++ {
if toBackOff {
l.debugLog("logziosender.go: failed to send logs, trying again in %v\n", backOff)
l.debugLog("sender: failed to send logs, trying again in %v\n", backOff)
time.Sleep(backOff)
backOff *= 2
}
Expand Down Expand Up @@ -400,7 +403,7 @@ func (l *LogzioSender) dequeueUpToMaxBatchSize() {
for l.buf.Len() < maxSize && err == nil {
item, err := l.queue.Dequeue()
if err != nil {
l.debugLog("queue state: %s\n", err)
l.debugLog("sender: queue state: %s\n", err)
}
if item != nil {
// NewLine is appended tp item.Value
Expand All @@ -409,9 +412,8 @@ func (l *LogzioSender) dequeueUpToMaxBatchSize() {
break
}
_, err := l.buf.Write(append(item.Value, '\n'))
//l.debugLog("logziosender.go: Adding item with size %d (total buffSize: %d)\n", len(item.Value), l.buf.Len())
if err != nil {
l.errorLog("error writing to buffer %s", err)
l.errorLog("sender: error writing to buffer %s", err)
}
} else {
break
Expand All @@ -426,10 +428,10 @@ func (l *LogzioSender) Sync() error {
}

func (l *LogzioSender) requeue() {
l.debugLog("logziosender.go: Requeue %s", l.buf.String())
l.debugLog("sender: Requeue %s", l.buf.String())
err := l.Send(l.buf.Bytes())
if err != nil {
l.errorLog("could not requeue logs %s", err)
l.errorLog("sender: could not requeue logs %s", err)
}
}

Expand All @@ -439,6 +441,10 @@ func (l *LogzioSender) debugLog(format string, a ...interface{}) {
}
}

func (l *LogzioSender) infoLog(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, format, a...)
}

func (l *LogzioSender) errorLog(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, format, a...)
}
Expand Down

0 comments on commit 60f9e47

Please sign in to comment.