Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed FTP notifications #166

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/cmd/server/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func FTPEvent(e *ftpx.Event) *models.Event {

return &models.Event{
Protocol: models.ProtoFTP,
RW: e.Log,
R: e.R,
W: e.W,
RW: e.RW,
Meta: structs.Map(meta),
RemoteAddr: e.RemoteAddr.String(),
ReceivedAt: e.ReceivedAt,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ftpx/ftpx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestMain(m *testing.M) {
}
}),
ftpx.OnClose(func(e *ftpx.Event) {
notifier.Notify(e.RemoteAddr, e.Log, map[string]interface{}{})
notifier.Notify(e.RemoteAddr, e.RW, map[string]interface{}{})
}),
}

Expand Down
76 changes: 31 additions & 45 deletions pkg/ftpx/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package ftpx

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"strings"
"time"

"github.com/russtone/sonar/pkg/netx"
)

var (
Expand Down Expand Up @@ -38,8 +38,11 @@ type Event struct {
// RemoteAddre is remote IP address.
RemoteAddr net.Addr

// Log is a full session log.
Log []byte
// RW is a full session log.
RW []byte

R []byte
W []byte

// Data stores args passed to the corresponding FTP commands during a session.
Data Data
Expand All @@ -66,65 +69,48 @@ type session struct {
onClose func(*Event)

// conn is a current TCP connection.
conn net.Conn

// r is a connection reader.
r *bufio.Reader

// w is a connection writer.
w *bufio.Writer
conn *netx.LoggingConn

// scanner is a connection reader scanner.
scanner *bufio.Scanner

// rw is a session log.
log *bytes.Buffer

// state is a current state of session.
state int

// data stores args passed to the corresponding FTP commands during
// a session.
data Data
}

// handleConn creates new FTP session and handles connection with it.
func handleConn(ctx context.Context, conn net.Conn, opts options) error {
var buf bytes.Buffer

r := bufio.NewReader(io.TeeReader(conn, &buf))
w := bufio.NewWriter(io.MultiWriter(conn, &buf))
scanner := bufio.NewScanner(r)
newConn := netx.NewLoggingConn(conn)

sess := &session{
messages: opts.messages,
onClose: opts.onClose,
conn: conn,
r: r,
w: w,
scanner: scanner,
log: &buf,
conn: newConn,
scanner: bufio.NewScanner(newConn),
}

start := time.Now()

newConn.OnClose = func() {
_, secure := sess.conn.Conn.(*tls.Conn)

sess.onClose(&Event{
RemoteAddr: sess.conn.RemoteAddr(),
RW: sess.conn.RW.Bytes(),
R: sess.conn.R.Bytes(),
W: sess.conn.W.Bytes(),
Data: sess.data,
Secure: secure,
ReceivedAt: start,
})
}

return sess.start(ctx)
}

func (s *session) start(ctx context.Context) error {
start := time.Now()

if s.onClose != nil {
defer func() {
_, secure := s.conn.(*tls.Conn)

s.onClose(&Event{
RemoteAddr: s.conn.RemoteAddr(),
Log: s.log.Bytes(),
Data: s.data,
Secure: secure,
ReceivedAt: start,
})
}()
}
defer s.conn.Close()

if err := s.greet(); err != nil {
return err
Expand All @@ -133,7 +119,7 @@ func (s *session) start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()

default:
if !s.scanner.Scan() {
Expand Down Expand Up @@ -196,10 +182,10 @@ func (s *session) parseCmd(line string) (string, string) {
}

func (s *session) writeLine(line string) error {
if _, err := s.w.WriteString(line + "\r\n"); err != nil {
if _, err := s.conn.Write([]byte(line + "\r\n")); err != nil {
return err
}
return s.w.Flush()
return nil
}

func (s *session) greet() error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/netx/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (l *LoggingListener) Accept() (net.Conn, error) {
return nil, err
}

return NewLoggingCon(conn), nil
return NewLoggingConn(conn), nil
}

// LoggingConn wraps net.Conn to save conversation log.
Expand All @@ -42,8 +42,8 @@ type LoggingConn struct {
OnClose func()
}

// NewLoggingCon wraps net.Conn and adds logging.
func NewLoggingCon(conn net.Conn) *LoggingConn {
// NewLoggingConn wraps net.Conn and adds logging.
func NewLoggingConn(conn net.Conn) *LoggingConn {
c := &LoggingConn{
Conn: conn,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/smtpx/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type session struct {
// handleConn creates new SMTP session and handles connection with it.
func handleConn(ctx context.Context, conn net.Conn, opts options) error {

newConn := netx.NewLoggingCon(conn)
newConn := netx.NewLoggingConn(conn)

sess := &session{
messages: opts.messages,
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *session) start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()

default:
if !s.scanner.Scan() {
Expand Down Expand Up @@ -310,7 +310,7 @@ func (s *session) handleStartTLS(args string) error {
return err
}

newConn := netx.NewLoggingCon(net.Conn(conn))
newConn := netx.NewLoggingConn(net.Conn(conn))

newConn.RW.Write(s.conn.RW.Bytes())
newConn.R.Write(s.conn.R.Bytes())
Expand Down
Loading