Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for GSSAPI in Kafka scaler #4851

Merged
merged 9 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **AWS SQS Scaler**: Support for scaling to include delayed messages. ([#4377](https://github.com/kedacore/keda/issues/4377))
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
- **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63))

### Improvements

- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764))
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
- **General**: Adding a changelog validating script to check for formatting and order ([#3190](https://github.com/kedacore/keda/issues/3190))
- **General**: Update golangci-lint version documented in CONTRIBUTING.md since old version doesn't support go 1.20 (N/A)
- **Kafka**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))

### Fixes

Expand All @@ -67,14 +71,15 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Clean up previously deprecated code for 2.12 release ([#4899](https://github.com/kedacore/keda/issues/4899))

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Other

- **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))

## v2.11.2
Expand Down
6 changes: 6 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ spec:
seccompProfile:
type: RuntimeDefault
volumeMounts:
- mountPath: /tmp/kerberos
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
name: temp-kerberos-vol
readOnly: false
- mountPath: /certs
name: certificates
readOnly: true
terminationGracePeriodSeconds: 10
nodeSelector:
kubernetes.io/os: linux
volumes:
- name: temp-kerberos-vol
emptyDir:
medium: Memory
- name: certificates
secret:
defaultMode: 420
Expand Down
187 changes: 154 additions & 33 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -51,6 +52,11 @@ type kafkaMetadata struct {
username string
password string

// GSSAPI
keytabPath string
realm string
kerberosConfigPath string

// OAUTHBEARER
scopes []string
oauthTokenEndpointURI string
Expand Down Expand Up @@ -82,6 +88,7 @@ const (
KafkaSASLTypeSCRAMSHA256 kafkaSaslType = "scram_sha256"
KafkaSASLTypeSCRAMSHA512 kafkaSaslType = "scram_sha512"
KafkaSASLTypeOAuthbearer kafkaSaslType = "oauthbearer"
KafkaSASLTypeGSSAPI kafkaSaslType = "gssapi"
)

const (
Expand Down Expand Up @@ -145,39 +152,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
saslAuthType = strings.TrimSpace(saslAuthType)
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
switch {
case mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer:
err := parseSaslParams(config, meta, mode)
if err != nil {
return err
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
case mode == KafkaSASLTypeGSSAPI:
err := parseKerberosParams(config, meta, mode)
if err != nil {
return err
}
} else {
default:
return fmt.Errorf("err SASL mode %s given", mode)
}
}
Expand Down Expand Up @@ -229,10 +215,112 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
}
meta.enableTLS = true
}
return nil
}

func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if (config.AuthParams["password"] == "" && config.AuthParams["keytab"] == "") ||
(config.AuthParams["password"] != "" && config.AuthParams["keytab"] != "") {
return errors.New("exactly one of 'password' or 'keytab' must be provided for GSSAPI authentication")
}
if config.AuthParams["password"] != "" {
meta.password = strings.TrimSpace(config.AuthParams["password"])
} else {
path, err := saveToFile(config.AuthParams["keytab"])
if err != nil {
return fmt.Errorf("error saving keytab to file: %w", err)
}
meta.keytabPath = path
}

if config.AuthParams["realm"] == "" {
return errors.New("no realm given")
}
meta.realm = strings.TrimSpace(config.AuthParams["realm"])

if config.AuthParams["kerberosConfig"] == "" {
return errors.New("no Kerberos configuration file (kerberosConfig) given")
}
path, err := saveToFile(config.AuthParams["kerberosConfig"])
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error saving kerberosConfig to file: %w", err)
}
meta.kerberosConfigPath = path

meta.saslType = mode
return nil
}

func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
}
return nil
}

func saveToFile(content string) (string, error) {
data := []byte(content)

tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos")
err := os.MkdirAll(tempKrbDir, 0700)
if err != nil {
fmt.Printf("Error creating temporary directory: %s. Error: %s\n", tempKrbDir, err)
return "", err
}

tempFile, err := os.CreateTemp(tempKrbDir, "krb_*")
if err != nil {
fmt.Println("Error creating temporary file:", err)
return "", err
}
defer tempFile.Close()

_, err = tempFile.Write(data)
if err != nil {
fmt.Println("Error writing to temporary file:", err)
return "", err
}

// Get the temporary file's name
tempFilename := tempFile.Name()
fmt.Println("Data has been successfully saved to temporary file:", tempFilename)

return tempFilename, nil
}

func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata, error) {
meta := kafkaMetadata{}
switch {
Expand Down Expand Up @@ -364,7 +452,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = metadata.version

if metadata.saslType != KafkaSASLTypeNone {
if metadata.saslType != KafkaSASLTypeNone && metadata.saslType != KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
Expand Down Expand Up @@ -398,6 +486,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions)
}

if metadata.saslType == KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
config.Net.SASL.GSSAPI.ServiceName = "kafka"
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
config.Net.SASL.GSSAPI.Username = metadata.username
config.Net.SASL.GSSAPI.Realm = metadata.realm
config.Net.SASL.GSSAPI.KerberosConfigPath = metadata.kerberosConfigPath
if metadata.keytabPath != "" {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
config.Net.SASL.GSSAPI.KeyTabPath = metadata.keytabPath
} else {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
config.Net.SASL.GSSAPI.Password = metadata.password
}
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %w", err)
Expand Down Expand Up @@ -558,7 +662,24 @@ func (s *kafkaScaler) Close(context.Context) error {
if s.admin == nil {
return nil
}
return s.admin.Close()

err := s.admin.Close()
if err != nil {
return err
}

// clean up any temporary files
if strings.TrimSpace(s.metadata.kerberosConfigPath) != "" {
if err := os.Remove(s.metadata.kerberosConfigPath); err != nil {
return err
}
}
if strings.TrimSpace(s.metadata.keytabPath) != "" {
if err := os.Remove(s.metadata.keytabPath); err != nil {
return err
}
}
return nil
}

func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
Expand Down
Loading