Skip to content

Commit

Permalink
code cleanup and validations
Browse files Browse the repository at this point in the history
  • Loading branch information
sushmithavangala committed Feb 3, 2022
1 parent 3996574 commit b6bc3d8
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 34 deletions.
61 changes: 38 additions & 23 deletions Kudu.Core/Functions/KafkaTriggerKedaAuthProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,22 @@ namespace Kudu.Core.Functions
{
public class KafkaTriggerKedaAuthProvider : IKedaAuthRefProvider
{
// private readonly IKubernetes _kubernetesClient;

// public KafkaTriggerKedaAuthProvider(IKubernetes kubernetesClient)
// {
// _kubernetesClient = kubernetesClient;
// }
public IDictionary<string, string> PopulateAuthenticationRef(JToken bindings, string functionName)
{
IDictionary<string, string> functionData = bindings.ToObject<Dictionary<string, JToken>>()
.Where(i => i.Value.Type == JTokenType.String)
.ToDictionary(k => k.Key, v => v.Value.ToString());
.ToDictionary(k => k.Key, v => v.Value.ToString(), StringComparer.OrdinalIgnoreCase);

if (!IsTriggerAuthRequired(functionData, functionName)) {
return null;
}

//map of secret keys to keda params required for trigger auth
IDictionary<string, string> secretKeyToKedaParam = new Dictionary<string, string>();
IDictionary<string, string> secretsForAppSettings = new Dictionary<string, string>();

//creates the map of secret keys to keda params required for trigger auth
if (functionData.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL)
&& (functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("SaslSsl", StringComparison.OrdinalIgnoreCase)
|| functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("SaslPlaintext", StringComparison.OrdinalIgnoreCase)
&& functionData.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE)
&& !functionData[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE_NOT_SET, StringComparison.OrdinalIgnoreCase)))
if ((functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SASL_SSL_PROTOCOL, StringComparison.OrdinalIgnoreCase)
|| functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SASL_PLAINTEXT_PROTOCOL, StringComparison.OrdinalIgnoreCase)))
{
secretKeyToKedaParam.Add(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE, getKedaProperty(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE));
secretKeyToKedaParam.Add(TriggerAuthConstants.KAFKA_TRIGGER_USERNAME, getKedaProperty(TriggerAuthConstants.KAFKA_TRIGGER_USERNAME));
Expand All @@ -44,8 +39,8 @@ public IDictionary<string, string> PopulateAuthenticationRef(JToken bindings, st
}

if (functionData.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL)
&& (functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("SaslSsl", StringComparison.OrdinalIgnoreCase)
|| functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("Ssl", StringComparison.OrdinalIgnoreCase))) {
&& (functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SASL_SSL_PROTOCOL, StringComparison.OrdinalIgnoreCase)
|| functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SSL_PROTOCOL, StringComparison.OrdinalIgnoreCase))) {
secretKeyToKedaParam.Add(TriggerAuthConstants.KAFKA_TRIGGER_TLS, getKedaProperty(TriggerAuthConstants.KAFKA_TRIGGER_TLS));
secretsForAppSettings.Add(TriggerAuthConstants.KAFKA_TRIGGER_TLS, "enable");
}
Expand All @@ -69,13 +64,11 @@ public IDictionary<string, string> PopulateAuthenticationRef(JToken bindings, st
}

//step 1: add the required trigger auth data as secrets in appsetting secrets file
string appNamespace = System.Environment.GetEnvironmentVariable("K8SE_APPS_NAMESPACE");
try
try
{
//add data as appsettings
K8SEDeploymentHelper.UpdateKubernetesSecrets(secretsForAppSettings, functionName + "-secrets", appNamespace);
}
catch (Exception ex)
AddTriggerAuthAppSettingsSecrets(secretsForAppSettings, functionName);

} catch (Exception ex)
{
//logging and continuing as keda handles if secret expected is not found
Console.WriteLine("Error while adding secrets required for trigger auth ", ex.ToString());
Expand All @@ -91,20 +84,25 @@ public IDictionary<string, string> PopulateAuthenticationRef(JToken bindings, st
catch (Exception ex)
{
Console.WriteLine("Error while creating Trigger Authentication Ref, function name : {0} ", functionName, ex.ToString());
Console.WriteLine("KEDA might not to able to scale the app {0} due to missing Trigger Authentication details", functionName, ex.ToString());
return null;
}

return authRef;
}

internal virtual void CreateTriggerAuthenticationRef(IDictionary<string, string> secretKeyToKedaParam, string functionName)
{
string secretKeyToKedaParamMap = System.Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(JsonConvert.SerializeObject(secretKeyToKedaParam)));

// functionName + "-secrets" is the filename for appsettings secrets
K8SEDeploymentHelper.CreateTriggerAuthenticationRef(functionName + "-secrets", secretKeyToKedaParamMap, functionName);
}

internal virtual void AddTriggerAuthAppSettingsSecrets(IDictionary<string, string> secretsForAppSettings, string functionName)
{
string appNamespace = System.Environment.GetEnvironmentVariable("K8SE_APPS_NAMESPACE");
K8SEDeploymentHelper.UpdateKubernetesSecrets(secretsForAppSettings, functionName + "-secrets", appNamespace);
}

internal string getKedaProperty(string triggerBinding)
{
if (triggerBinding == null)
Expand All @@ -113,5 +111,22 @@ internal string getKedaProperty(string triggerBinding)
}
return TriggerAuthConstants.KafkaTriggerBindingToKedaProperty.GetValueOrDefault(triggerBinding);
}

internal Boolean IsTriggerAuthRequired(IDictionary<string, string> functionBindings, string functionName) {

if (!functionBindings.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL) || !functionBindings.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE)) {
return false;
}

if (functionBindings[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE_NOT_SET, StringComparison.OrdinalIgnoreCase)
|| functionBindings[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL_NOT_SET, StringComparison.OrdinalIgnoreCase)) {
return false;
}

if (functionBindings[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals("Gssapi", StringComparison.OrdinalIgnoreCase)) {
Console.WriteLine("Gssapi as Authentication Mode is not supported in Keda, function app {0} might not be able to scale", functionName);
}
return true;
}
}
}
12 changes: 9 additions & 3 deletions Kudu.Core/Functions/TriggerAuthConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public static class TriggerAuthConstants
public const string KAFKA_TRIGGER_AUTH_MODE = "authenticationMode";
public const string KAFKA_TRIGGER_USERNAME = "username";
public const string KAFKA_TRIGGER_PASSWORD = "password";
public const string KAFKA_TRIGGER_SSL_CA_LOCATION = "SslCaLocation";
public const string KAFKA_TRIGGER_SSL_CERT_LOCATION = "SslCertificateLocation";
public const string KAFKA_TRIGGER_SSL_KEY_LOCATION = "SslKeyLocation";
public const string KAFKA_TRIGGER_SSL_CA_LOCATION = "sslCaLocation";
public const string KAFKA_TRIGGER_SSL_CERT_LOCATION = "sslCertificateLocation";
public const string KAFKA_TRIGGER_SSL_KEY_LOCATION = "sslKeyLocation";
public const string KAFKA_TRIGGER_TLS = "tls";

public const string KAFKA_TRIGGER_AUTH_MODE_NOT_SET = "NotSet";
Expand All @@ -28,6 +28,12 @@ public static class TriggerAuthConstants
public const string KAFKA_KEDA_PARAM_KEY_LOCATION = "key";
public const string KAFKA_KEDA_PARAM_TLS = "tls";

//Below protocol values must be same as BrokerProtocol values from kafka extension
public const string KAFKA_TRIGGER_SASL_SSL_PROTOCOL = "SaslSsl";
public const string KAFKA_TRIGGER_SSL_PROTOCOL = "Ssl";
public const string KAFKA_TRIGGER_SASL_PLAINTEXT_PROTOCOL = "SaslPlaintext";
public const string KAFKA_TRIGGER_PLAINTEXT_PROTOCOL = "Plaintext";

public static readonly Dictionary<string, string> KafkaTriggerBindingToKedaProperty = new Dictionary<string, string>()
{
{ KAFKA_TRIGGER_AUTH_MODE, KAFKA_KEDA_PARAM_AUTH_MODE },
Expand Down
88 changes: 80 additions & 8 deletions Kudu.Tests/Core/Function/KafkaTriggerKedaAuthProviderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,33 @@ namespace Kudu.Tests.Core.Function
{
public class KafkaTriggerKedaAuthProviderTest
{
[Fact]
public void TestPopulateAuthenticationRef_Is_BindingCaseInsensitive()
{
KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload();
string jsonText = @"
{
""Protocol"": ""SaslSsl"",
""authenticationMode"": ""Plain"",
""username"": ""test"",
""password"": ""test""
}";

JToken jsonObj = JToken.Parse(jsonText);
IDictionary<string, string> authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName");
Assert.Equal(1, authRef.Count);
}

[Fact]
public void PopulateAuthenticationRef_With_ProtocolData()
{
KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload();
string jsonText = @"
{
""Protocol"": ""SASL_SSL"",
""AuthenticationMode"": ""PLAINTEXT"",
""Username"": ""test"",
""Password"": ""test""
""protocol"": ""SaslSsl"",
""authenticationMode"": ""Plain"",
""username"": ""test"",
""password"": ""test""
}";

JToken jsonObj = JToken.Parse(jsonText);
Expand All @@ -32,17 +49,67 @@ public void PopulateAuthenticationRef_Fails_When_TriggerAuthCreationFails()
KafkaTriggerKedaAuthProviderErrorMock kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderErrorMock();
string jsonText = @"
{
""Protocol"": ""SASL_SSL"",
""AuthenticationMode"": ""PLAINTEXT"",
""Username"": ""test"",
""Password"": ""test""
""protocol"": ""SaslSsl"",
""authenticationMode"": ""Plain"",
""username"": ""test"",
""password"": ""test""
}";

JToken jsonObj = JToken.Parse(jsonText);
IDictionary<string, string> authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName");
Assert.Null(authRef);
}

[Fact]
public void PopulateAuthenticationRef_Continues_When_AddSecretsFails()
{
KafkaTriggerKedaAuthProviderErrorMock kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderErrorMock();
string jsonText = @"
{
""protocol"": ""SaslSsl"",
""authenticationMode"": ""Plain"",
""username"": ""test"",
""password"": ""test""
}";

JToken jsonObj = JToken.Parse(jsonText);
IDictionary<string, string> authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName");
Assert.Equal(1, authRef.Count);
}

[Fact]
public void TestIFTriggerAuthIsNull_With_NoAuthenticationMode()
{
KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload();
string jsonText = @"
{
""protocol"": ""SaslSsl"",
""username"": ""test"",
""password"": ""test""
}";

JToken jsonObj = JToken.Parse(jsonText);
IDictionary<string, string> authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName");
Assert.Null(authRef);
}

[Fact]
public void TestIFTriggerAuthIsNull_With_NoProtocol()
{
KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload();
string jsonText = @"
{
""authenticationMode"": ""Plain"",
""username"": ""test"",
""password"": ""test""
}";

JToken jsonObj = JToken.Parse(jsonText);
IDictionary<string, string> authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName");
Assert.Null(authRef);
}


private class KafkaTriggerKedaAuthProviderOverload : KafkaTriggerKedaAuthProvider
{
internal override void CreateTriggerAuthenticationRef(IDictionary<string, string> secretKeyToKedaParam, string functionName)
Expand All @@ -58,6 +125,11 @@ internal override void CreateTriggerAuthenticationRef(IDictionary<string, string
{
throw new Exception("exception for unit test");
}

internal override void AddTriggerAuthAppSettingsSecrets(IDictionary<string, string> secretsForAppSettings, string functionName)
{
throw new Exception("exception for unit test");
}
}
}
}

0 comments on commit b6bc3d8

Please sign in to comment.