Skip to content

Commit

Permalink
Port squarebot configurations
Browse files Browse the repository at this point in the history
These configurations for topics and Kafka are ported from Squarebot.
  • Loading branch information
jonathansick committed Feb 15, 2024
1 parent 2d91321 commit 5c87555
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 1 deletion.
247 changes: 246 additions & 1 deletion src/unfurlbot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,195 @@

from __future__ import annotations

from pydantic import Field
import ssl
from enum import Enum
from pathlib import Path

from pydantic import DirectoryPath, Field, FilePath, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
from safir.logging import LogLevel, Profile

__all__ = ["Config", "config"]


class KafkaSecurityProtocol(str, Enum):
"""Kafka security protocols understood by aiokafka."""

PLAINTEXT = "PLAINTEXT"
"""Plain-text connection."""

SSL = "SSL"
"""TLS-encrypted connection."""


class KafkaSaslMechanism(str, Enum):
"""Kafka SASL mechanisms understood by aiokafka."""

PLAIN = "PLAIN"
"""Plain-text SASL mechanism."""

SCRAM_SHA_256 = "SCRAM-SHA-256"
"""SCRAM-SHA-256 SASL mechanism."""

SCRAM_SHA_512 = "SCRAM-SHA-512"
"""SCRAM-SHA-512 SASL mechanism."""


class KafkaConnectionSettings(BaseSettings):
"""Settings for connecting to Kafka."""

bootstrap_servers: str = Field(
...,
title="Kafka bootstrap servers",
description=(
"A comma-separated list of Kafka brokers to connect to. "
"This should be a list of hostnames or IP addresses, "
"each optionally followed by a port number, separated by "
"commas. "
"For example: `kafka-1:9092,kafka-2:9092,kafka-3:9092`."
),
)

security_protocol: KafkaSecurityProtocol = Field(
KafkaSecurityProtocol.PLAINTEXT,
description="The security protocol to use when connecting to Kafka.",
)

cert_temp_dir: DirectoryPath | None = Field(
None,
description=(
"Temporary writable directory for concatenating certificates."
),
)

cluster_ca_path: FilePath | None = Field(
None,
title="Path to CA certificate file",
description=(
"The path to the CA certificate file to use for verifying the "
"broker's certificate. "
"This is only needed if the broker's certificate is not signed "
"by a CA trusted by the operating system."
),
)

client_ca_path: FilePath | None = Field(
None,
title="Path to client CA certificate file",
description=(
"The path to the client CA certificate file to use for "
"authentication. "
"This is only needed when the client certificate needs to be"
"concatenated with the client CA certificate, which is common"
"for Strimzi installations."
),
)

client_cert_path: FilePath | None = Field(
None,
title="Path to client certificate file",
description=(
"The path to the client certificate file to use for "
"authentication. "
"This is only needed if the broker is configured to require "
"SSL client authentication."
),
)

client_key_path: FilePath | None = Field(
None,
title="Path to client key file",
description=(
"The path to the client key file to use for authentication. "
"This is only needed if the broker is configured to require "
"SSL client authentication."
),
)

client_key_password: SecretStr | None = Field(
None,
title="Password for client key file",
description=(
"The password to use for decrypting the client key file. "
"This is only needed if the client key file is encrypted."
),
)

sasl_mechanism: KafkaSaslMechanism | None = Field(
KafkaSaslMechanism.PLAIN,
title="SASL mechanism",
description=(
"The SASL mechanism to use for authentication. "
"This is only needed if SASL authentication is enabled."
),
)

sasl_username: str | None = Field(
None,
title="SASL username",
description=(
"The username to use for SASL authentication. "
"This is only needed if SASL authentication is enabled."
),
)

sasl_password: SecretStr | None = Field(
None,
title="SASL password",
description=(
"The password to use for SASL authentication. "
"This is only needed if SASL authentication is enabled."
),
)

model_config = SettingsConfigDict(
env_prefix="KAFKA_", case_sensitive=False
)

@property
def ssl_context(self) -> ssl.SSLContext | None:
"""An SSL context for connecting to Kafka with aiokafka, if the
Kafka connection is configured to use SSL.
"""
if (
self.security_protocol != KafkaSecurityProtocol.SSL
or self.cluster_ca_path is None
or self.client_cert_path is None
or self.client_key_path is None
):
return None

client_cert_path = Path(self.client_cert_path)

if self.client_ca_path is not None:
# Need to contatenate the client cert and CA certificates. This is
# typical for Strimzi-based Kafka clusters.
if self.cert_temp_dir is None:
raise RuntimeError(
"KAFKIT_KAFKA_CERT_TEMP_DIR must be set when "
"a client CA certificate is provided."
)
client_ca = Path(self.client_ca_path).read_text()
client_cert = Path(self.client_cert_path).read_text()
sep = "" if client_ca.endswith("\n") else "\n"
new_client_cert = sep.join([client_cert, client_ca])
new_client_cert_path = Path(self.cert_temp_dir) / "client.crt"
new_client_cert_path.write_text(new_client_cert)
client_cert_path = Path(new_client_cert_path)

# Create an SSL context on the basis that we're the client
# authenticating the server (the Kafka broker).
ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=str(self.cluster_ca_path)
)
# Add the certificates that the Kafka broker uses to authenticate us.
ssl_context.load_cert_chain(
certfile=str(client_cert_path), keyfile=str(self.client_key_path)
)

return ssl_context


class Config(BaseSettings):
"""Configuration for unfurlbot."""

Expand All @@ -24,6 +206,69 @@ class Config(BaseSettings):
LogLevel.INFO, title="Log level of the application's logger"
)

kafka: KafkaConnectionSettings = Field(
default_factory=KafkaConnectionSettings,
title="Kafka connection configuration.",
)

slack_token: SecretStr = Field(title="Slack bot token")

slack_app_id: str = Field(title="Slack app ID")

app_mention_topic: str = Field(
"squarebot.app_mention",
title="app_mention Kafka topic",
alias="UNFURLBOT_TOPIC_APP_MENTION",
description="Kafka topic name for `app_mention` Slack events.",
)

message_channels_topic: str = Field(
"squarebot.message.channels",
title="message.channels Kafka topic",
alias="UNFURLBOT_TOPIC_MESSAGE_CHANNELS",
description=(
"Kafka topic name for `message.channels` Slack events (messages "
"in public channels)."
),
)

message_im_topic: str = Field(
"squarebot.message.im",
title="message.im Kafka topic",
alias="UNFURLBOT_TOPIC_MESSAGE_IM",
description=(
"Kafka topic name for `message.im` Slack events (direct message "
" channels)."
),
)

message_groups_topic: str = Field(
"squarebot.message.groups",
title="message.groups Kafka topic",
alias="UNFURLBOT_TOPIC_MESSAGE_GROUPS",
description=(
"Kafka topic name for `message.groups` Slack events (messages in "
"private channels)."
),
)

message_mpim_topic: str = Field(
"squarebot.message.mpim",
title="message.mpim Kafka topic",
alias="UNFURLBOT_TOPIC_MESSAGE_MPIM",
description=(
"Kafka topic name for `message.mpim` Slack events (messages in "
"multi-person direct messages)."
),
)

interaction_topic: str = Field(
"squarebot.interaction",
title="interaction Kafka topic",
alias="UNFURLBOT_TOPIC_INTERACTION",
description=("Kafka topic name for `interaction` Slack events"),
)

model_config = SettingsConfigDict(
env_prefix="UNFURLBOT_", case_sensitive=False
)
Expand Down
7 changes: 7 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ description = Run pytest against {envname}.
deps =
-r{toxinidir}/requirements/main.txt
-r{toxinidir}/requirements/dev.txt
setenv =
UNFURLBOT_LOG_LEVEL = DEBUG
KAFKA_BOOTSTRAP_SERVERS = localhost:9092
KAFKA_SECURITY_PROTOCOL = PLAINTEXT
UNFURLBOT_SLACK_SIGNING = 1234
UNFURLBOT_SLACK_TOKEN = 1234
UNFURLBOT_SLACK_APP_ID = 1234l
commands =
pytest --cov=unfurlbot --cov-branch --cov-report= {posargs}

Expand Down

0 comments on commit 5c87555

Please sign in to comment.