Skip to content

Commit

Permalink
add support for gssapi to kafka scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Novichenok <[email protected]>
  • Loading branch information
novicher committed Aug 26, 2023
1 parent e694286 commit 12476ee
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 40 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **AWS SQS Scaler**: Support for scaling to include delayed messages. ([#4377](https://github.com/kedacore/keda/issues/4377))
- **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63))

### Improvements

- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764))
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
- **General**: Adding a changelog validating script to check for formatting and order ([#3190](https://github.com/kedacore/keda/issues/3190))
- **General**: Update golangci-lint version documented in CONTRIBUTING.md since old version doesn't support go 1.20 (N/A)
- **Kafka**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))

### Fixes

Expand All @@ -67,14 +71,15 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Clean up previously deprecated code for 2.12 release ([#4899](https://github.com/kedacore/keda/issues/4899))

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Other

- **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))

## v2.11.2
Expand Down
6 changes: 6 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ spec:
seccompProfile:
type: RuntimeDefault
volumeMounts:
- mountPath: /tmp/kerberos
name: temp-kerberos-vol
readOnly: false
- mountPath: /certs
name: certificates
readOnly: true
terminationGracePeriodSeconds: 10
nodeSelector:
kubernetes.io/os: linux
volumes:
- name: temp-kerberos-vol
emptyDir:
medium: Memory
- name: certificates
secret:
defaultMode: 420
Expand Down
187 changes: 154 additions & 33 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -51,6 +52,11 @@ type kafkaMetadata struct {
username string
password string

// GSSAPI
keytabPath string
realm string
kerberosConfigPath string

// OAUTHBEARER
scopes []string
oauthTokenEndpointURI string
Expand Down Expand Up @@ -82,6 +88,7 @@ const (
KafkaSASLTypeSCRAMSHA256 kafkaSaslType = "scram_sha256"
KafkaSASLTypeSCRAMSHA512 kafkaSaslType = "scram_sha512"
KafkaSASLTypeOAuthbearer kafkaSaslType = "oauthbearer"
KafkaSASLTypeGSSAPI kafkaSaslType = "gssapi"
)

const (
Expand Down Expand Up @@ -145,39 +152,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
saslAuthType = strings.TrimSpace(saslAuthType)
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
switch {
case mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer:
err := parseSaslParams(config, meta, mode)
if err != nil {
return err
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
case mode == KafkaSASLTypeGSSAPI:
err := parseKerberosParams(config, meta, mode)
if err != nil {
return err
}
} else {
default:
return fmt.Errorf("err SASL mode %s given", mode)
}
}
Expand Down Expand Up @@ -229,10 +215,112 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
}
meta.enableTLS = true
}
return nil
}

func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if (config.AuthParams["password"] == "" && config.AuthParams["keytab"] == "") ||
(config.AuthParams["password"] != "" && config.AuthParams["keytab"] != "") {
return errors.New("exactly one of 'password' or 'keytab' must be provided for GSSAPI authentication")
}
if config.AuthParams["password"] != "" {
meta.password = strings.TrimSpace(config.AuthParams["password"])
} else {
path, err := saveToFile(config.AuthParams["keytab"])
if err != nil {
return fmt.Errorf("error saving keytab to file: %w", err)
}
meta.keytabPath = path
}

if config.AuthParams["realm"] == "" {
return errors.New("no realm given")
}
meta.realm = strings.TrimSpace(config.AuthParams["realm"])

if config.AuthParams["kerberosConfig"] == "" {
return errors.New("no Kerberos configuration file (kerberosConfig) given")
}
path, err := saveToFile(config.AuthParams["kerberosConfig"])
if err != nil {
return fmt.Errorf("error saving kerberosConfig to file: %w", err)
}
meta.kerberosConfigPath = path

meta.saslType = mode
return nil
}

func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
}
return nil
}

func saveToFile(content string) (string, error) {
data := []byte(content)

tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos")
err := os.MkdirAll(tempKrbDir, 0700)
if err != nil {
fmt.Printf("Error creating temporary directory: %s. Error: %s\n", tempKrbDir, err)
return "", err
}

tempFile, err := os.CreateTemp(tempKrbDir, "krb_*")
if err != nil {
fmt.Println("Error creating temporary file:", err)
return "", err
}
defer tempFile.Close()

_, err = tempFile.Write(data)
if err != nil {
fmt.Println("Error writing to temporary file:", err)
return "", err
}

// Get the temporary file's name
tempFilename := tempFile.Name()
fmt.Println("Data has been successfully saved to temporary file:", tempFilename)

return tempFilename, nil
}

func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata, error) {
meta := kafkaMetadata{}
switch {
Expand Down Expand Up @@ -364,7 +452,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = metadata.version

if metadata.saslType != KafkaSASLTypeNone {
if metadata.saslType != KafkaSASLTypeNone && metadata.saslType != KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
Expand Down Expand Up @@ -398,6 +486,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions)
}

if metadata.saslType == KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
config.Net.SASL.GSSAPI.ServiceName = "kafka"
config.Net.SASL.GSSAPI.Username = metadata.username
config.Net.SASL.GSSAPI.Realm = metadata.realm
config.Net.SASL.GSSAPI.KerberosConfigPath = metadata.kerberosConfigPath
if metadata.keytabPath != "" {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
config.Net.SASL.GSSAPI.KeyTabPath = metadata.keytabPath
} else {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
config.Net.SASL.GSSAPI.Password = metadata.password
}
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %w", err)
Expand Down Expand Up @@ -558,7 +662,24 @@ func (s *kafkaScaler) Close(context.Context) error {
if s.admin == nil {
return nil
}
return s.admin.Close()

err := s.admin.Close()
if err != nil {
return err
}

// clean up any temporary files
if strings.TrimSpace(s.metadata.kerberosConfigPath) != "" {
if err := os.Remove(s.metadata.kerberosConfigPath); err != nil {
return err
}
}
if strings.TrimSpace(s.metadata.keytabPath) != "" {
if err := os.Remove(s.metadata.keytabPath); err != nil {
return err
}
}
return nil
}

func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
Expand Down
Loading

0 comments on commit 12476ee

Please sign in to comment.