Skip to content

Commit

Permalink
Merge pull request #25 from unix1/clean-up-files-after-processing
Browse files Browse the repository at this point in the history
Clean up files after processing
  • Loading branch information
Ths2-9Y-LqJt6 authored Sep 5, 2018
2 parents 8266c30 + 054ba4d commit 80ee33e
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 12 deletions.
2 changes: 2 additions & 0 deletions DNSAuth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
)

type Config struct {
CleanupAction string `cfg:"cleanup-action; none; /(none|move|delete)/; Action to take after processing a file"`
CleanupDir string `cfg:"cleanup-dir; required; path; Path to move processed files when cleanup-action=move"`
CustomerDB string `cfg:"customer-db; required; "`
CustomerRefresh int `cfg:"customer-refresh; 24; "`
InfluxDB string `cfg:"influx-db; required; "`
Expand Down
7 changes: 7 additions & 0 deletions DNSAuth/dnsauth.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ influx-db = "http://127.0.0.1:8086/write?db=authdns"

# The directory DNSAuth should watch for new log files coming in.
watch-dir = "./"

# Action to take after processing a file; one of none, move, or delete.
cleanup-action = "none"

# Path to move processed files when cleanup-action = "move".
# Must not be a sub-directory of watch-dir; no trailing slash.
cleanup-dir = "/tmp"
53 changes: 41 additions & 12 deletions DNSAuth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"compress/gzip"
"flag"
"fmt"
"log"
"net"
"os"
Expand Down Expand Up @@ -40,7 +41,7 @@ func main() {

flag.Parse()

log.Println("Loading config file...")
log.Printf("Loading config file %s...\n", *confpath)
config, err := LoadConfig(*confpath)
if err != nil {
log.Fatalln("FAILED: ", err)
Expand Down Expand Up @@ -85,7 +86,7 @@ func main() {
if strings.HasSuffix(path, ".dmp.gz") {

if _, found := files[path]; !found {
go aggreagate(path, limiter)
go aggregate(path, limiter, config)
limiter <- true
}
newFiles[path] = true
Expand Down Expand Up @@ -114,13 +115,13 @@ func main() {
}
}

func aggreagate(filepath string, limiter chan bool) {
func aggregate(filePath string, limiter chan bool, config *Config) {

starttime := time.Now()

defer func() { <-limiter }()

fileHandle, err := os.Open(filepath)
fileHandle, err := os.Open(filePath)
if err != nil {
log.Println(err)
return
Expand All @@ -129,21 +130,21 @@ func aggreagate(filepath string, limiter chan bool) {

reader, err := gzip.NewReader(fileHandle)
if err != nil {
log.Println(filepath, ": ", err)
log.Println(filePath, ": ", err)
return
}
defer reader.Close()

index := strings.LastIndex(filepath, "mon-") + len("mon-")
mon := filepath[index : index+2]
pop := filepath[index+3 : index+6]
index := strings.LastIndex(filePath, "mon-") + len("mon-")
mon := filePath[index : index+2]
pop := filePath[index+3 : index+6]

index = strings.LastIndex(filepath, "net_") + len("net_")
timestamp := filepath[index : index+16]
index = strings.LastIndex(filePath, "net_") + len("net_")
timestamp := filePath[index : index+16]

date, err := time.Parse(LAYOUT, timestamp)
if err != nil {
log.Println(filepath, ": ", err)
log.Println(filePath, ": ", err)
return
}

Expand All @@ -160,7 +161,7 @@ func aggreagate(filepath string, limiter chan bool) {

fields := strings.Fields(line)
if len(fields) != 9 {
log.Println("Issue unformatting line:", line, " for dump ", filepath)
log.Println("Issue unformatting line:", line, " for dump ", filePath)
continue
}
buffer.WriteString(line)
Expand Down Expand Up @@ -190,6 +191,10 @@ func aggreagate(filepath string, limiter chan bool) {
handleQuery(date.Truncate(time.Minute), pop, line)

}
err = cleanupFile(filePath, config)
if err != nil {
log.Printf("Failed to clean up %s. Reason: %s", filePath, err)
}
proctime := time.Since(starttime)
log.Printf("Processed dump [mon-%s-%s](%s - %s): %d lines in (%s) seconds!\n",
mon, pop, initialdate, date, cpt, proctime)
Expand Down Expand Up @@ -236,3 +241,27 @@ func handleQuery(time time.Time, pop, line string) {

dnsqueries.GetAt(time, fields[DIRECTION], pop, qtypestr, rcodestr, name, zone, protocol, version, originAs, prefix).Inc()
}

func cleanupFile(filePath string, config *Config) error {
var err error
switch config.CleanupAction {
case "move":
err = cleanupFileMove(filePath, config.CleanupDir)
case "delete":
err = cleanupFileDelete(filePath)
case "none":
default:
err = fmt.Errorf("Invalid config setting for cleanup action: %s", config.CleanupAction)
}
return err
}

func cleanupFileMove(filePath string, destDir string) error {
log.Printf("Moving file %s to %s\n", filePath, destDir)
return os.Rename(filePath, destDir+"/"+filepath.Base(filePath))
}

func cleanupFileDelete(filePath string) error {
log.Printf("Removing file %s\n", filePath)
return os.Remove(filePath)
}
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ influx-db = "http://127.0.0.1:8086/write?db=authdns"
# The directory DNSAuth should watch for new log files coming in.
watch-dir = "./"
# Action to take after processing a file; one of none, move, or delete.
cleanup-action = "none"
# Path to move processed files when cleanup-action = "move".
# Must not be a sub-directory of watch-dir; no trailing slash.
cleanup-dir = "/tmp"
```

DNSAuth ships with this file as displayed above. During the set up steps below, you'll copy it to have a local copy which you can customize if needed.
Expand Down

0 comments on commit 80ee33e

Please sign in to comment.