diff --git a/Dockerfile b/Dockerfile index e80745b..60cb651 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20 AS builder +FROM golang:1.21 AS builder WORKDIR /work @@ -6,12 +6,9 @@ COPY .git Makefile go.* *.go /work/ COPY pkg/ /work/pkg/ RUN make bin/audit-forwarder -FROM fluent/fluent-bit:1.9.10 +FROM debian:bullseye-20231030-slim -COPY --from=builder /work/bin/audit-forwarder /fluent-bit/bin/ -COPY fluent-bit.conf /fluent-bit/etc/ -COPY parsers.conf /fluent-bit/etc/ -COPY null.conf /fluent-bit/etc/add/ +COPY --from=builder /work/bin/audit-forwarder / -ENTRYPOINT ["/fluent-bit/bin/audit-forwarder"] -CMD ["/fluent-bit/bin/audit-forwarder"] +ENTRYPOINT ["/audit-forwarder"] +CMD ["/audit-forwarder"] diff --git a/fluent-bit.conf b/fluent-bit.conf deleted file mode 100644 index 42e8b91..0000000 --- a/fluent-bit.conf +++ /dev/null @@ -1,32 +0,0 @@ -[SERVICE] - Daemon Off - Log_Level ${LOG_LEVEL} - Parsers_File parsers.conf -[INPUT] - Name tail - Path ${AUDIT_LOG_PATH} - DB /audit.db - Tag audit - Parser audit - Read_from_Head On - Buffer_Chunk_Size 2MB - Buffer_Max_Size 10MB - Skip_Long_Lines On - mem_buf_limit ${MEM_BUF_LIMIT} - -[OUTPUT] - Name forward - Match audit - Host ${AUDIT_TAILER_HOST} - Port ${AUDIT_TAILER_PORT} - Require_ack_response True - Compress gzip - tls On - tls.verify On - tls.debug 2 - tls.ca_file ${TLS_CA_FILE} - tls.crt_file ${TLS_CRT_FILE} - tls.key_file ${TLS_KEY_FILE} - tls.vhost ${TLS_VHOST} - -@INCLUDE add/*.conf diff --git a/go.mod b/go.mod index 046f5b4..3801e46 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/metal-stack/audit-forwarder -go 1.20 +go 1.21 require ( github.com/go-playground/validator/v10 v10.12.0 diff --git a/main.go b/main.go index 94acf3a..0c24128 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "log" - "path" "time" "go.uber.org/zap" @@ -18,7 +17,6 @@ import ( "context" "os" - "os/exec" "strconv" "strings" @@ -38,21 +36,18 @@ const ( commandArgs = "--config=/fluent-bit/etc/fluent-bit.conf" // commandName = "sleep" // commandArgs = "3600" + proxyListenAddress = "0.0.0.0" + proxyListenPort = "9876" ) var ( - cfgFile string - logger *zap.SugaredLogger - logLevel zapcore.Level - stop context.Context - targetService *corev1.Service - certSecret *corev1.Secret - forwarderKilledChan chan struct{} - killForwarderChan chan struct{} - forwarderProcess *os.Process - secretCronID cron.EntryID - serviceCronID cron.EntryID - clusterProxy *proxy.Proxy + cfgFile string + logger *zap.SugaredLogger + logLevel zapcore.Level + stop context.Context + targetService *corev1.Service + serviceCronID cron.EntryID + clusterProxy *proxy.Proxy ) // CronLogger is used for logging within the cron function. @@ -73,32 +68,23 @@ func (c *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) // this is because MarkFlagRequired from cobra does not work well with viper, see: // https://github.com/spf13/viper/issues/397 type Opts struct { - KubeCfg string - NameSpace string - ServiceName string - SecretName string - AuditLogPath string - MemBufLimit string - TLSBaseDir string - TLSCaFile string - TLSCrtFile string - TLSKeyFile string - TLSVhost string - CheckSchedule string - BackoffTimer time.Duration - LogLevel string - FluentLogLevel string - KonnectivityUDSSocket string - ProxyHost string - ProxyPort string - ProxyCaFile string - ProxyClientCrtFile string - ProxyClientKeyFile string + KubeCfg string + NameSpace string + ServiceName string + SecretName string + CheckSchedule string + BackoffTimer time.Duration + LogLevel string + ProxyHost string + ProxyPort string + ProxyCaFile string + ProxyClientCrtFile string + ProxyClientKeyFile string } var cmd = &cobra.Command{ Use: moduleName, - Short: "A program to forward audit logs to a service in the cluster. It looks for a matching service, then starts a forwarder program (eg fluent-bit) to pick up the log events and do the actual forwarding.", + Short: "A program to forward audit logs to a service in the cluster. It looks for a matching service, then starts listens for connections from the fluent-bit log forwarder and tunnels the connection through the VPN to the audittailer service.", Version: v.V.String(), Run: func(cmd *cobra.Command, args []string) { initConfig() @@ -127,18 +113,9 @@ func init() { cmd.Flags().StringP("namespace", "n", "kube-system", "the namespace of the audit-tailer service") cmd.Flags().StringP("service-name", "s", "kubernetes-audit-tailer", "the service name of the audit-tailer service") cmd.Flags().StringP("secret-name", "e", "audittailer-client", "the name of the secret containing the CA file, client certificate and key") - cmd.Flags().StringP("audit-log-path", "l", "/auditlog/audit.log", "the path to the audit-log file") - cmd.Flags().StringP("mem-buf-limit", "m", "200M", "The memory buffer limit for fluent-bit") - cmd.Flags().StringP("tls-basedir", "B", "/fluent-bit/etc/ssl", "the path to the directory where the cert and key files should be written") - cmd.Flags().StringP("tls-ca-file", "C", "ca.crt", "the filename of the CA file for checking the server (audit-tailer) certificate") - cmd.Flags().StringP("tls-crt-file", "R", "audittailer-client.crt", "the filename of the client certificate used to authenticate to the audit-tailer") - cmd.Flags().StringP("tls-key-file", "K", "audittailer-client.key", "the filename of the private key file belonging to the client certificate") - cmd.Flags().StringP("tls-vhost", "H", "kubernetes-audit-tailer", "the name of the audit-tailer, as presented in its server certificate. This is needed so that the certificate is accepted by fluent-bit") cmd.Flags().StringP("check-schedule", "S", "*/1 * * * *", "cron schedule when to check for service changes") cmd.Flags().DurationP("backoff-timer", "b", time.Duration(10*time.Second), "Backoff time for restarting the forwarder process when it has been killed by external influences") cmd.Flags().StringP("log-level", "L", "info", "sets the application log level") - cmd.Flags().StringP("fluent-log-level", "O", "info", "sets the log level for the fluent-bit command") - cmd.Flags().StringP("konnectivity-uds-socket", "u", "", "If set, try and connect through this konnectivity UDS socket. Expected method is http-connect. Mutually exclusive with proxy-host.") cmd.Flags().StringP("proxy-host", "p", "", "If set, try and connect through this mTLS proxy at the given destination. Expected method is http-connect. Mutually exclusive with konectivity-uds-socket.") cmd.Flags().StringP("proxy-port", "P", "9443", "Port of the mTLS proxy specified with proxy-host.") cmd.Flags().String("proxy-ca-file", "/proxy/ca/ca.crt", "the path to the CA file for checking the mTLS proxy server certificate") @@ -153,27 +130,18 @@ func init() { func initOpts() (*Opts, error) { opts := &Opts{ - KubeCfg: viper.GetString("kubecfg"), - NameSpace: viper.GetString("namespace"), - ServiceName: viper.GetString("service-name"), - SecretName: viper.GetString("secret-name"), - AuditLogPath: viper.GetString("audit-log-path"), - MemBufLimit: viper.GetString("mem-buf-limit"), - TLSBaseDir: viper.GetString("tls-basedir"), - TLSCaFile: viper.GetString("tls-ca-file"), - TLSCrtFile: viper.GetString("tls-crt-file"), - TLSKeyFile: viper.GetString("tls-key-file"), - TLSVhost: viper.GetString("tls-vhost"), - CheckSchedule: viper.GetString("check-schedule"), - BackoffTimer: viper.GetDuration("backoff-timer"), - LogLevel: viper.GetString("log-level"), - FluentLogLevel: viper.GetString("fluent-log-level"), - KonnectivityUDSSocket: viper.GetString("konnectivity-uds-socket"), - ProxyHost: viper.GetString("proxy-host"), - ProxyPort: viper.GetString("proxy-port"), - ProxyCaFile: viper.GetString("proxy-ca-file"), - ProxyClientCrtFile: viper.GetString("proxy-client-crt-file"), - ProxyClientKeyFile: viper.GetString("proxy-client-key-file"), + KubeCfg: viper.GetString("kubecfg"), + NameSpace: viper.GetString("namespace"), + ServiceName: viper.GetString("service-name"), + SecretName: viper.GetString("secret-name"), + CheckSchedule: viper.GetString("check-schedule"), + BackoffTimer: viper.GetDuration("backoff-timer"), + LogLevel: viper.GetString("log-level"), + ProxyHost: viper.GetString("proxy-host"), + ProxyPort: viper.GetString("proxy-port"), + ProxyCaFile: viper.GetString("proxy-ca-file"), + ProxyClientCrtFile: viper.GetString("proxy-client-crt-file"), + ProxyClientKeyFile: viper.GetString("proxy-client-key-file"), } validate := validator.New() @@ -254,17 +222,6 @@ func initSignalHandlers() { func run(opts *Opts) error { logger.Debugw("Options", "opts", opts) - // initialise our synchronisation channels - forwarderKilledChan = make(chan struct{}) - killForwarderChan = make(chan struct{}, 1) - - // set up certificates directory - err := os.MkdirAll(opts.TLSBaseDir, 0750) - if err != nil { - logger.Errorw("Unable to create certificate directory", opts.TLSBaseDir, err) - return err - } - // Prepare K8s client, err := loadClient(opts.KubeCfg) if err != nil { @@ -277,17 +234,6 @@ func run(opts *Opts) error { cron.SkipIfStillRunning(&CronLogger{l: logger.Named("cron")}), )) - secretCronID, err = cronjob.AddFunc(opts.CheckSchedule, func() { - err := checkSecret(opts, client) - if err != nil { - logger.Errorw("error during secret check", "error", err) - } - - logger.Debugw("scheduling next secret check", "at", cronjob.Entry(secretCronID).Next) - }) - if err != nil { - return fmt.Errorf("could not initialize cron schedule %w", err) - } serviceCronID, err = cronjob.AddFunc(opts.CheckSchedule, func() { err := checkService(opts, client) if err != nil { @@ -302,10 +248,6 @@ func run(opts *Opts) error { logger.Infow("start initial checks", "version", v.V.String()) - err = checkSecret(opts, client) - if err != nil { - logger.Errorw("error during initial secret check", "error", err) - } err = checkService(opts, client) if err != nil { logger.Errorw("error during initial service check", "error", err) @@ -376,9 +318,8 @@ func checkService(opts *Opts, client *k8s.Clientset) error { defer cancel() service, err := client.CoreV1().Services(opts.NameSpace).Get(kubectx, opts.ServiceName, metav1.GetOptions{}) if err != nil { // That means no matching service found - if targetService != nil { // This means a service was previously seen, and a forwarder should already be running. - logger.Infow("Service went away, killing forwarder") - killForwarder() + if targetService != nil { // This means a service was previously seen, and the proxy should already be running. + logger.Infow("Service went away, stopping proxy") if clusterProxy != nil { // This means there should be a running proxy, we need to stop it too. clusterProxy.DestroyProxy() clusterProxy = nil @@ -396,164 +337,28 @@ func checkService(opts *Opts, client *k8s.Clientset) error { } servicePort := strconv.Itoa(int(service.Spec.Ports[0].Port)) - if targetService != nil { // This means a service was previously seen, and a forwarder should already be running. + if targetService != nil { // This means a service was previously seen, and the proxy should already be running. if targetService.Spec.ClusterIP == service.Spec.ClusterIP && targetService.Spec.Ports[0].Port == service.Spec.Ports[0].Port { logger.Debugw("Service stayed the same, nothing to do.") return nil } - // We need to kill the old forwarder - killForwarder() if clusterProxy != nil { // This means there should be a running proxy, we need to stop it too. clusterProxy.DestroyProxy() clusterProxy = nil } } - // Check whether the certificates have already been written! - if certSecret == nil { - logger.Debugw("No certificates in place, waiting.") - return nil - } - logger.Infow("Target identified", "IP", serviceIP, "Port", servicePort) - fluentTargetIP := serviceIP - if opts.KonnectivityUDSSocket != "" { // This means we need to start a konnectivity UDS proxy - if opts.ProxyHost != "" { - logger.Errorw("konnectivityproxy configuration error, both UDS and proxy host defined. This code should never be reached.", "konnectivity-uds-socket", opts.KonnectivityUDSSocket, "proxy-host", opts.ProxyHost) - return errors.New("proxy config error") - } - logger.Infow("Starting proxy", "uds", opts.KonnectivityUDSSocket) - clusterProxy, err = proxy.NewProxyUDS(logger, opts.KonnectivityUDSSocket, serviceIP, servicePort, "127.0.0.1", servicePort) - if err != nil { - logger.Errorw("Could not start UDS proxy", "error", err) - return err - } - fluentTargetIP = "127.0.0.1" - } else if opts.ProxyHost != "" { // This means we need to start a mTLS proxy + if opts.ProxyHost != "" { // This means we need to start a mTLS proxy logger.Infow("Starting proxy", "host", opts.ProxyHost, "port", opts.ProxyPort) - clusterProxy, err = proxy.NewProxyMTLS(logger, opts.ProxyHost, opts.ProxyPort, opts.ProxyClientCrtFile, opts.ProxyClientKeyFile, opts.ProxyCaFile, serviceIP, servicePort, "127.0.0.1", servicePort) + clusterProxy, err = proxy.NewProxyMTLS(logger, opts.ProxyHost, opts.ProxyPort, opts.ProxyClientCrtFile, opts.ProxyClientKeyFile, opts.ProxyCaFile, serviceIP, servicePort, proxyListenAddress, proxyListenPort) if err != nil { logger.Errorw("Could not start mTLS proxy", "error", err) return err } - fluentTargetIP = "127.0.0.1" } - go runForwarder(fluentTargetIP, servicePort, opts) targetService = service return nil } - -func runForwarder(fluentTargetIP, servicePort string, opts *Opts) { - for { - logger.Info("Starting forwarder") - - var fluentLogLevel zapcore.Level = zap.InfoLevel - err := fluentLogLevel.UnmarshalText([]byte(opts.FluentLogLevel)) - if err != nil { - logger.Errorw("Can't set fluent-bit log level", "opts.FluentLogLevel:", opts.FluentLogLevel) - } - - cmd := exec.Command(commandName, commandArgs) - cmd.Stdout = os.Stdout // Lets us see stdout and stderr of cmd - cmd.Stderr = os.Stderr - - cmd.Env = append(os.Environ(), - "AUDIT_TAILER_HOST="+fluentTargetIP, - "AUDIT_TAILER_PORT="+servicePort, - "AUDIT_LOG_PATH="+opts.AuditLogPath, - "MEM_BUF_LIMIT="+opts.MemBufLimit, - "TLS_CA_FILE="+path.Join(opts.TLSBaseDir, opts.TLSCaFile), - "TLS_CRT_FILE="+path.Join(opts.TLSBaseDir, opts.TLSCrtFile), - "TLS_KEY_FILE="+path.Join(opts.TLSBaseDir, opts.TLSKeyFile), - "TLS_VHOST="+opts.TLSVhost, - "LOG_LEVEL="+fluentLogLevel.String(), - ) - logger.Debugw("Executing:", "Command", strings.Join(cmd.Args, " "), ", Environment:", strings.Join(cmd.Env, ", ")) - - err = cmd.Start() - if err != nil { - logger.Errorw("Could not start forwarder", "Error", err) - } - logger.Infow("Forwarder process", "PID", cmd.Process) - forwarderProcess = cmd.Process - err = cmd.Wait() - - if err != nil { - logger.Infow("Forwarder exited", "Error", err) - } - // command is finished, now we check if it died or if we killed it intentionally. - select { - case <-killForwarderChan: - logger.Infow("Forwarder was killed on purpose") - forwarderKilledChan <- struct{}{} - logger.Debugw("Written to confirmation channel, returning") - return - default: - logger.Infow("Forwarder was not killed by this controller, or was killed so it can re-read its certificates; restarting", "Backoff time", opts.BackoffTimer) - time.Sleep(opts.BackoffTimer) - } - } -} - -func killForwarder() { - logger.Infow("Killing process", "PID", forwarderProcess) - logger.Debugw("Writing to kill channel") - killForwarderChan <- struct{}{} - err := forwarderProcess.Kill() - if err != nil { - logger.Errorw("Could not kill process", "Error", err) - } - // Wait for the old forwarder to exit - <-forwarderKilledChan - logger.Infow("Forwarder successfully killed") -} - -func checkSecret(opts *Opts, client *k8s.Clientset) error { - logger.Debugw("Checking secret") - keys := []string{opts.TLSCaFile, opts.TLSCrtFile, opts.TLSKeyFile} - - kubectx, kubecancel := context.WithTimeout(context.Background(), time.Duration(10*time.Second)) - defer kubecancel() - - secret, err := client.CoreV1().Secrets(opts.NameSpace).Get(kubectx, opts.SecretName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("did not find client secret %q in namespace %s: %w", opts.SecretName, opts.NameSpace, err) - } - logger.Debugw("Got secret", opts.SecretName, secret.Name) - - if certSecret != nil { // A secret has already been seen - if secret.ResourceVersion == certSecret.ResourceVersion { // Secret stayed the same, nothing to do - logger.Debugw("Secret stayed the same", "ResourceVersion", secret.ResourceVersion) - return nil - } - } - - // Now we attempt to write the certificates to file - for _, k := range keys { - v, ok := secret.Data[k] - if !ok { - return fmt.Errorf("could not find key in secret key:%s", k) - } - f := path.Join(opts.TLSBaseDir, k) - logger.Debugw("Writing certificate to file", k, f) - err := os.WriteFile(f, v, 0600) - if err != nil { - return fmt.Errorf("could not write secret to certificate base folder:%w", err) - } - } - - // Certificates successfully written; if there is a forwarder already running we must restart it now. - if forwarderProcess != nil { - logger.Debugw("Forwarder running, killing it so it can restart.", "Process:", forwarderProcess) - err := forwarderProcess.Kill() - if err != nil { - logger.Errorw("Could not kill process", "Error", err) - } - } - - certSecret = secret - - return nil -} diff --git a/null.conf b/null.conf deleted file mode 100644 index 5271636..0000000 --- a/null.conf +++ /dev/null @@ -1 +0,0 @@ -# An empty fluent-bit config file so that the @INCLUDE directive in fluent-bit.conf does not fail. \ No newline at end of file diff --git a/parsers.conf b/parsers.conf deleted file mode 100644 index bc55f99..0000000 --- a/parsers.conf +++ /dev/null @@ -1,6 +0,0 @@ -[PARSER] - Name audit - Format json - Time_Key stageTimestamp - Time_Format %Y-%m-%dT%H:%M:%S.%LZ - Time_Keep On diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 7bb04e1..aaa352b 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -25,7 +25,6 @@ import ( type Proxy struct { logger *zap.SugaredLogger - uds string proxyHost string proxyPort string clientCert tls.Certificate @@ -38,26 +37,6 @@ type Proxy struct { } // Creates a new proxy instance and opens a TCP listener for accepting connections. -func NewProxyUDS(logger *zap.SugaredLogger, uds, destinationIP, destinationPort, listenerIP, listenerPort string) (*Proxy, error) { - proxy := &Proxy{ - logger: logger, - uds: uds, - destinationIP: destinationIP, - destinationPort: destinationPort, - listenerIP: listenerIP, - listenerPort: listenerPort, - } - logger.Infow("NewProxyUDS called", "unix domain socket", uds, "listener IP", listenerIP, "listener port", listenerPort) - - err := proxy.listen() - if err != nil { - logger.Errorw("Error opening listener", "proxy", proxy) - return nil, err - } - go proxy.forward() - return proxy, nil -} - func NewProxyMTLS(logger *zap.SugaredLogger, proxyHost, proxyPort, clientCertFile, clientKeyFile, proxyCAFile, destinationIP, destinationPort, listenerIP, listenerPort string) (*Proxy, error) { logger.Infow("NewProxyMTLS called", "proxy host", proxyHost, "proxy port", proxyPort, "listener IP", listenerIP, "listener port", listenerPort) clientCert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) @@ -119,46 +98,33 @@ func (p *Proxy) forward() { // Closes the listener. func (p *Proxy) DestroyProxy() { - p.logger.Infow("Closing forwarder", "uds", p.uds, "destination ip", p.destinationIP) + p.logger.Infow("Closing forwarder", "destination ip", p.destinationIP) p.listener.Close() } func (p *Proxy) handleConnection(srvConn *net.TCPConn) { - p.logger.Infow("handleConnection called", "local address", srvConn.LocalAddr(), "remote address", srvConn.RemoteAddr(), "unix domain socket", p.uds, "proxy host", p.proxyHost, "proxy port", p.proxyPort, "target address", p.destinationIP) + p.logger.Infow("handleConnection called", "local address", srvConn.LocalAddr(), "remote address", srvConn.RemoteAddr(), "proxy host", p.proxyHost, "proxy port", p.proxyPort, "target address", p.destinationIP) var proxyConn net.Conn - if p.uds != "" { - if p.proxyHost != "" { - p.logger.Errorw("proxy configuration error, both UDS and proxy host defined. This code should never be reached.", "UDS", p.uds, "proxy host", p.proxyHost) - return - } - var err error - proxyConn, err = net.Dial("unix", p.uds) - if err != nil { - p.logger.Errorw("dialing uds proxy failed", "unix domain socket", p.uds, "error", err) - return - } - } else { - var err error - proxyConn, err = tls.Dial("tcp", p.proxyHost+":"+p.proxyPort, &tls.Config{ - Certificates: []tls.Certificate{p.clientCert}, - RootCAs: p.proxyCAPool, - MinVersion: tls.VersionTLS12, - }) - if err != nil { - p.logger.Errorw("dialing mTLS proxy failed", "proxy address", p.proxyHost+":"+p.proxyPort, "error", err) - } + var err error + proxyConn, err = tls.Dial("tcp", p.proxyHost+":"+p.proxyPort, &tls.Config{ + Certificates: []tls.Certificate{p.clientCert}, + RootCAs: p.proxyCAPool, + MinVersion: tls.VersionTLS12, + }) + if err != nil { + p.logger.Errorw("dialing mTLS proxy failed", "proxy address", p.proxyHost+":"+p.proxyPort, "error", err) } fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\nUser-Agent: %s\r\n\r\n", net.JoinHostPort(p.destinationIP, p.destinationPort), p.listenerIP, "auditforwarder") br := bufio.NewReader(proxyConn) res, err := http.ReadResponse(br, nil) if err != nil { - p.logger.Errorf("reading HTTP response from CONNECT to %s via uds proxy %s failed: %v", p.destinationIP, p.uds, err) + p.logger.Errorf("reading HTTP response from CONNECT to %s failed: %v", p.destinationIP, err) return } defer res.Body.Close() if res.StatusCode != 200 { - p.logger.Errorf("proxy error from %s while dialing %s: %v", p.uds, p.destinationIP, res.Status) + p.logger.Errorf("proxy error while dialing %s: %v", p.destinationIP, res.Status) return } // It's safe to discard the bufio.Reader here and return the @@ -166,7 +132,7 @@ func (p *Proxy) handleConnection(srvConn *net.TCPConn) { // TLS, and in TLS the client speaks first, so we know there's // no unbuffered data. But we can double-check. if br.Buffered() > 0 { - p.logger.Errorf("unexpected %d bytes of buffered data from CONNECT uds proxy %q", br.Buffered(), p.uds) + p.logger.Errorf("unexpected %d bytes of buffered data from CONNECT", br.Buffered()) return } // Now we're supposed to have both connections open.