-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
100 lines (91 loc) · 1.63 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"flag"
"fmt"
"log"
"os"
"time"
consume "github.com/log-shiper/consume"
"github.com/log-shiper/g"
"github.com/log-shiper/httpserver"
"github.com/log-shiper/produce"
"github.com/log-shiper/tool"
)
var (
h bool
f string
t string
b string
a string
p string
)
type LogProcess struct {
read Reader
write Writer
ch chan string
}
type Reader interface {
Read(ch chan string)
}
type Writer interface {
Write(ch chan string)
}
func usage() {
fmt.Fprintf(os.Stderr, `Usage: log-shiper [-f filename] [-t topic] [-b brokers] [-a ip] [-p port] -[h]`)
flag.PrintDefaults()
os.Exit(-1)
}
func init() {
flag.BoolVar(&h, "h", false, "this help")
flag.StringVar(&a, "a", "127.0.0.1", "log agent ip address")
flag.StringVar(&b, "b", "", "kafka broker address")
flag.StringVar(&f, "f", "", "log file name")
flag.StringVar(&p, "p", "", "log agent port")
flag.StringVar(&t, "t", "", "topic name")
flag.Usage = usage
}
func main() {
flag.Parse()
if h {
flag.Usage()
}
arg := map[string]string{
"broker": b,
"file": f,
"ip": a,
"topic": t,
"port": p,
}
n, ret := tool.Argument(arg)
if !ret {
logMsg := fmt.Sprintf("%s is null", n)
log.Print(logMsg)
flag.Usage()
}
r := &produce.ReadFromFile{
Path: f,
}
w := &consume.WriteToKafka{
Brokers: b,
Topic: t,
MsgKey: g.MsgKey{
Addr: a,
Port: p,
},
}
c := make(chan string, 1000)
lp := &LogProcess{
read: r,
write: w,
ch: c,
}
go lp.read.Read(lp.ch)
for i := 1; i <= 5; i++ {
go lp.write.Write(lp.ch)
}
m := httpserver.Monitor{
StartTime: time.Now(),
Data: g.SystemInfo{},
}
m.Start(lp.ch, p)
}