Skip to content

Commit

Permalink
Release v1.1.0 (#3)
Browse files Browse the repository at this point in the history
Add dependency caching in Docker.
Add Dockerfile linter hadoling.
Add pre-commit.
Use cron instead of spamming on delivery server.
Add a test that checks if the connection is restored in case of interruption.
  • Loading branch information
F0rzend authored Jun 13, 2022
1 parent 0f16b8f commit 857e086
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 71 deletions.
7 changes: 7 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.dockerignore
.gitignore
.idea/
.github/
docker-compose.yml
Dockerfile
README.md
2 changes: 2 additions & 0 deletions .hadolint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ignored:
- DL3007
30 changes: 30 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
fail_fast: false

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: mixed-line-ending
args: [ '--fix=lf' ]
- id: check-yaml
- id: check-json
- id: check-added-large-files
- id: pretty-format-json
args: [ '--autofix', '--no-sort-keys' ]

- repo: https://github.com/golangci/golangci-lint
rev: v1.46.2
hooks:
- id: golangci-lint

- repo: https://github.com/hadolint/hadolint
rev: v2.10.0
hooks:
- id: hadolint

- repo: https://github.com/segmentio/golines
rev: v0.10.0
hooks:
- id: golines
18 changes: 9 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ ARG GO_VERSION=1.18

FROM golang:${GO_VERSION}-alpine as builder

WORKDIR /go/src/
WORKDIR /build

COPY go.mod go.sum ./
RUN go mod download

COPY . .

ENV CGO_ENABLED=0
ENV GO_OSARCH="linux/amd64"

RUN go build -o /go/bin/binary main.go
RUN go build -ldflags "-s -w" -o ./app .

FROM gcr.io/distroless/base
FROM gcr.io/distroless/base:latest

COPY --from=builder /go/bin/binary /go/bin/binary
COPY --from=builder /build/app /app

ENV OUTPUT_DIRECTORY=/tmp/output
VOLUME /tmp/output
ENV OUTPUT_DIRECTORY=/tmp/records

CMD ["/go/bin/binary"]
CMD ["/app"]
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,34 @@ sites, and it will successfully detect file type and download
content.

In an endless loop with a break on delay
(to avoid creating a heavy load on the target server),
(to avoid creating a heavy load on the target server),
the application will wait for a `OK 200` response, after which it will copy the response
body into the generated file.

## Features

* File type detection (without losing bytes)
* Waiting for broadcast availability
* Waiting for broadcast availability
* Delay for avoiding creating a heavy load on the target server
* Automatic generation of human-readable file names
* Unlimited number of files (Limited by disk space)
* Unit tests

## Configuration

You can use `dumper.yml` file or **environment variables** for
You can use `dumper.yml` file or **environment variables** for
configuration.

| yml | env | default | description |
|------------------|------------------|------------|--------------------------------------------------------------|
| source_url | SOURCE_URL | | Address from where to download information |
| file_prefix | FILE_PREFIX | | Prefix for files |
| file_date_format | FILE_DATE_FORMAT | 02_01_2006 | Date format for files |
| output_directory | OUTPUT_DIRECTORY | . | Directory where to save files |
| timeout | TIMEOUT | 10 | Delay between sending requests if response status is not 200 |
| log_level | LOG_LEVEL | info | Log level |
| yml | env | default | description |
|------------------|------------------|------------|-------------------------------------------------------------|
| source_url | SOURCE_URL | | Address from where to download information |
| file_prefix | FILE_PREFIX | | Prefix for files |
| schedule | SCHEDULE | | Schedule for downloading information in cron format |
| duration | DURATION | | Duration of the dump |
| delay | DELAY | 5s | Delay between request in case of interruption of the stream |
| file_date_format | FILE_DATE_FORMAT | 02_01_2006 | Date format for files |
| output_directory | OUTPUT_DIRECTORY | . | Directory where to save files |
| log_level | LOG_LEVEL | info | Log level |

## Run with [docker compose](https://docs.docker.com/compose/)

Expand Down
7 changes: 6 additions & 1 deletion copier/file_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,10 @@ func (f *DatedFileBuilder) GetFileName(ext string) string {
}

func (f *DatedFileBuilder) GetOutput(ext string) (io.WriteCloser, error) {
return os.Create(f.GetFileName(ext))
filename := f.GetFileName(ext)
return os.OpenFile(
filename,
os.O_CREATE|os.O_WRONLY,
fs.ModePerm,
)
}
83 changes: 83 additions & 0 deletions copier/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package copier

import (
"fmt"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
"time"
)

const (
readableTimeLayout = "02.01.2006 15:04:05"
)

type Runner struct {
copier *StreamCopier
}

func NewRunner(copier *StreamCopier) *Runner {
return &Runner{
copier: copier,
}
}

func (r *Runner) ScheduleRecording(
cronString string,
duration time.Duration,
url string,
outputFunc GetOutputFunc,
delay time.Duration,
) error {
c := cron.New()
entryID, err := c.AddFunc(cronString, func() { r.record(url, outputFunc, duration, delay) })
if err != nil {
return err
}

c.Start()

entry := getEntryByID(c, entryID)
if entry == nil {
return fmt.Errorf("entry with id %v not found", entryID)
}

r.copier.logger.Info().
Str("url", url).
Str("delay", delay.String()).
Str("next_call", entry.Next.Format(readableTimeLayout)).
Str("duration", duration.String()).
Msg("Starting listening")

return nil
}

func (r *Runner) record(
url string,
outputFunc GetOutputFunc,
duration time.Duration,
delay time.Duration,
) {
start := time.Now()
finish := start.Add(duration)
for {
log.Debug().Msg("RUN")
if time.Now().After(finish) {
return
}
if err := r.copier.CopyStream(url, outputFunc); err != nil && err != ErrStreamClosed {
r.copier.logger.Error().Err(err).Msg("Error copying stream")
return
}
time.Sleep(delay)
}
}

func getEntryByID(c *cron.Cron, id cron.EntryID) *cron.Entry {
for _, entry := range c.Entries() {
if entry.ID == id {
return &entry
}
}

return nil
}
17 changes: 3 additions & 14 deletions copier/stream_copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"net/http"
"os"
"time"
)

const (
Expand Down Expand Up @@ -58,6 +57,8 @@ func (d *StreamCopier) CopyStream(url string, getOutput GetOutputFunc) error {
return ErrStreamClosed
}

log.Info().Msg("recording started")

buf := bufio.NewReader(resp.Body)
fileHeader, err := buf.Peek(fileHeaderSize)
if err != nil && err != io.EOF {
Expand All @@ -83,18 +84,6 @@ func (d *StreamCopier) CopyStream(url string, getOutput GetOutputFunc) error {
bytesCopied, err := io.Copy(output, buf)
log.Debug().Int64("bytes_copied", bytesCopied).Msg("copied bytes")

log.Info().Msg("recording finished")
return err
}

func (d *StreamCopier) ListenAndCopy(
url string,
getOutput GetOutputFunc,
delay time.Duration,
) error {
for {
if err := d.CopyStream(url, getOutput); err != nil && err != ErrStreamClosed {
return err
}
time.Sleep(delay)
}
}
107 changes: 81 additions & 26 deletions copier/stream_copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"io"
"log"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -20,41 +21,95 @@ func (c *closableBuffer) Close() error {
return nil
}

func TestStreamCopier_CopyStream_Success(t *testing.T) {
func TestStreamCopier(t *testing.T) {
t.Parallel()

serverOutput := []byte("Hello World!")
testCases := []struct {
name string
handler http.HandlerFunc
err error
output []byte
}{
{
name: "success",
handler: handlerSuccess,
output: []byte("Hello World!"),
err: nil,
},
{
name: "not found",
handler: handlerNotFound,
output: []byte{},
err: ErrStreamClosed,
},
{
name: "with interrupt",
handler: getHandlerWithInterrupt(),
output: []byte("12"),
err: nil,
},
}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write(serverOutput)
assert.NoError(t, err)
}))
defer server.Close()
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

copier := NewStreamCopier(http.DefaultClient, testLogger)
server := httptest.NewServer(tc.handler)
defer server.Close()

output := new(closableBuffer)
copier := NewStreamCopier(http.DefaultClient, testLogger)
output := new(closableBuffer)

err := copier.CopyStream(server.URL, func(_ string) (io.WriteCloser, error) {
return output, nil
})
assert.NoError(t, err)
assert.Equal(t, serverOutput, output.Bytes())
err := copier.CopyStream(server.URL, func(_ string) (io.WriteCloser, error) {
return output, nil
})
assert.Equal(t, tc.err, err)
})
}
}

func TestStreamCopier_CopyStream_WithStreamClosed(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer server.Close()
func handlerSuccess(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("Hello World!"))
if err != nil {
log.Println(err)
}
}

copier := NewStreamCopier(http.DefaultClient, testLogger)
func handlerNotFound(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotFound)
}

err := copier.CopyStream(server.URL, func(_ string) (io.WriteCloser, error) {
return new(closableBuffer), nil
})
func getHandlerWithInterrupt() http.HandlerFunc {
responses := []struct {
status int
body []byte
}{
{
status: http.StatusOK,
body: []byte("1"),
},
{
status: http.StatusNotFound,
},
{
status: http.StatusOK,
body: []byte("2"),
},
}
return func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

assert.ErrorIs(t, err, ErrStreamClosed)
for _, response := range responses {
w.WriteHeader(response.status)
_, err := w.Write(response.body)
log.Println(err)
flusher.Flush()
}
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ require (
github.com/gabriel-vasile/mimetype v1.4.0
github.com/google/uuid v1.3.0
github.com/ilyakaznacheev/cleanenv v1.2.6
github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.27.0
github.com/stretchr/testify v1.7.2
)

require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/joho/godotenv v1.4.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand Down
Loading

0 comments on commit 857e086

Please sign in to comment.