Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for HTTP/HTTPS protocol for ClickHouse connection #938

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 55 additions & 20 deletions quesma/clickhouse/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
)

func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {

options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}}

if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" {

options.Auth = clickhouse.Auth{
Expand All @@ -29,6 +29,18 @@ func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql
if !c.ClickHouse.DisableTLS {
options.TLS = tlsConfig
}
if c.ClickHouse.Url.Scheme == "clickhouse" {
options.Protocol = clickhouse.Native
} else if c.ClickHouse.Url.Scheme == "http" {
logger.Warn().Msgf("Using HTTP protocol to connect to ClickHouse. We recommend using ClickHouse Native protocol instead (default port 9000).")
options.Protocol = clickhouse.HTTP
options.TLS = nil
} else if c.ClickHouse.Url.Scheme == "https" {
logger.Warn().Msgf("Using HTTPS protocol to connect to ClickHouse. We recommend using ClickHouse Native protocol instead (default port 9000).")
options.Protocol = clickhouse.HTTP
} else {
logger.Error().Msgf("Unknown ClickHouse endpoint protocol '%s' in the provided URL %s. Defaulting to ClickHouse Native protocol. Please provide the ClickHouse URL in the following format: clickhouse://host:port", c.ClickHouse.Url.Scheme, c.ClickHouse.Url.String())
}

info := struct {
Name string
Expand Down Expand Up @@ -101,9 +113,7 @@ func InitDBConnectionPool(c *config.QuesmaConfiguration) *sql.DB {
// in case of misconfigured ClickHouse connection. In the future, we might rethink how do we manage this and perhaps
// move some parts to InitDBConnectionPool, but for now this should already provide some useful feedback.
func RunClickHouseConnectionDoctor(c *config.QuesmaConfiguration) {
timeout := 1 * time.Second
defaultNativeProtocolPort := "9000"
defaultNativeProtocolPortEncrypted := "9440"
timeout := 5 * time.Second

logger.Info().Msgf("[connection-doctor] Starting ClickHouse connection doctor")
hostName, port := c.ClickHouse.Url.Hostname(), c.ClickHouse.Url.Port()
Expand All @@ -112,24 +122,49 @@ func RunClickHouseConnectionDoctor(c *config.QuesmaConfiguration) {
connTcp, errTcp := net.DialTimeout("tcp", address, timeout)
if errTcp != nil {
logger.Info().Msgf("[connection-doctor] Failed dialing with %s, err=[%v], no service listening at configured host/port, make sure to specify reachable ClickHouse address", address, errTcp)
logger.Info().Msgf("[connection-doctor] Trying default ClickHouse native ports...")
if conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", hostName, defaultNativeProtocolPort), timeout); err == nil {
logger.Info().Msgf("[connection-doctor] Default ClickHouse plaintext port is reachable, consider changing ClickHouse port to %s in Quesma configuration", defaultNativeProtocolPort)
conn.Close()
}
if conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", hostName, defaultNativeProtocolPortEncrypted), timeout); err == nil {
logger.Info().Msgf("[connection-doctor] Default ClickHouse TLS port is reachable, consider changing ClickHouse port to %s in Quesma conbfiguration", defaultNativeProtocolPortEncrypted)
conn.Close()
}
tryDefaultPorts(hostName, timeout)
return
}
logger.Info().Msgf("[connection-doctor] Successfully dialed host/port (%s)...", address)
defer connTcp.Close()
logger.Info().Msgf("[connection-doctor] Trying to establish TLS connection with configured host/port (%s)", address)
connTls, errTls := tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", address, &tls.Config{InsecureSkipVerify: true})
if errTls != nil {
logger.Info().Msgf("[connection-doctor] Failed establishing TLS connection with %s, err=[%v], please use `clickhouse.disableTLS: true` in Quesma configuration", address, errTls)
return

if !c.ClickHouse.DisableTLS {
logger.Info().Msgf("[connection-doctor] Trying to establish TLS connection with configured host/port (%s)", address)
connTls, errTls := tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", address, &tls.Config{InsecureSkipVerify: true})
if errTls != nil {
logger.Info().Msgf("[connection-doctor] Failed establishing TLS connection with %s, err=[%v], please use `config.disableTLS: true` in Quesma configuration of ClickHouse backend connector", address, errTls)
return
}
defer connTls.Close()
logger.Info().Msgf("[connection-doctor] TLS connection (handshake) with %s established successfully", address)
} else {
logger.Info().Msgf("[connection-doctor] TLS connection is disabled in Quesma configuration (consider trying `config.disableTLS: false` in Quesma configuration of ClickHouse backend connector), skipping TLS connection tests.")
}
logger.Info().Msgf("[connection-doctor] Make sure you are using the correct protocol (currently: %s), correct username/password and correct database (currently: '%s')", c.ClickHouse.Url.Scheme, c.ClickHouse.Database)
tryDefaultPorts(hostName, timeout)
}

func tryDefaultPorts(hostName string, timeout time.Duration) {
defaultNativeProtocolPort := "9000"
defaultNativeProtocolPortEncrypted := "9440"
defaultHttpPort := "8123"
defaultHttpsPort := "8443"

logger.Info().Msgf("[connection-doctor] Trying default ClickHouse ports...")
if conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", hostName, defaultNativeProtocolPort), timeout); err == nil {
logger.Info().Msgf("[connection-doctor] Default ClickHouse plaintext port is reachable, consider changing the ClickHouse URL in Quesma configuration to clickhouse://%s:%s", hostName, defaultNativeProtocolPort)
conn.Close()
}
if conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", hostName, defaultNativeProtocolPortEncrypted), timeout); err == nil {
logger.Info().Msgf("[connection-doctor] Default ClickHouse TLS port is reachable, consider changing the ClickHouse URL in Quesma configuration to clickhouse://%s:%s", hostName, defaultNativeProtocolPortEncrypted)
conn.Close()
}
if conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", hostName, defaultHttpPort), timeout); err == nil {
logger.Info().Msgf("[connection-doctor] Default ClickHouse HTTP port is reachable, consider changing the ClickHouse URL in Quesma configuration to http://%s:%s", hostName, defaultHttpPort)
conn.Close()
}
if conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", hostName, defaultHttpsPort), timeout); err == nil {
logger.Info().Msgf("[connection-doctor] Default ClickHouse HTTPS port is reachable, consider changing the ClickHouse URL in Quesma configuration to https://%s:%s", hostName, defaultHttpsPort)
conn.Close()
}
defer connTls.Close()
logger.Info().Msgf("[connection-doctor] TLS connection (handshake) with %s established successfully", address)
}
24 changes: 10 additions & 14 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,22 @@ const (
)

type QuesmaNewConfiguration struct {
BackendConnectors []BackendConnector `koanf:"backendConnectors"`
FrontendConnectors []FrontendConnector `koanf:"frontendConnectors"`
InstallationId string `koanf:"installationId"`
LicenseKey string `koanf:"licenseKey"`
Logging LoggingConfiguration `koanf:"logging"`
IngestStatistics bool `koanf:"ingestStatistics"`
QuesmaInternalTelemetryUrl *Url `koanf:"internalTelemetryUrl"`
Processors []Processor `koanf:"processors"`
Pipelines []Pipeline `koanf:"pipelines"`
DisableTelemetry bool `koanf:"disableTelemetry"`
BackendConnectors []BackendConnector `koanf:"backendConnectors"`
FrontendConnectors []FrontendConnector `koanf:"frontendConnectors"`
InstallationId string `koanf:"installationId"`
LicenseKey string `koanf:"licenseKey"`
Logging LoggingConfiguration `koanf:"logging"`
IngestStatistics bool `koanf:"ingestStatistics"`
Processors []Processor `koanf:"processors"`
Pipelines []Pipeline `koanf:"pipelines"`
DisableTelemetry bool `koanf:"disableTelemetry"`
}

type LoggingConfiguration struct {
Path string `koanf:"path"`
Level *zerolog.Level `koanf:"level"`
RemoteLogDrainUrl *Url `koanf:"remoteUrl"`
FileLogging bool `koanf:"fileLogging"`
RemoteLogDrainUrl *Url
}

type Pipeline struct {
Expand Down Expand Up @@ -118,9 +117,6 @@ type QuesmaProcessorConfig struct {

func LoadV2Config() QuesmaNewConfiguration {
var v2config QuesmaNewConfiguration
v2config.QuesmaInternalTelemetryUrl = telemetryUrl
v2config.Logging.RemoteLogDrainUrl = telemetryUrl

loadConfigFile()
// We have to use custom env provider to allow array overrides
if err := k.Load(Env2JsonProvider("QUESMA_", "_", nil), json.Parser(), koanf.WithMergeFunc(mergeDictFunc)); err != nil {
Expand Down
11 changes: 10 additions & 1 deletion quesma/quesma/config/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Elastic-2.0
package config

import "net/url"
import (
"fmt"
"net/url"
)

type Url url.URL

Expand All @@ -15,6 +18,12 @@ func (u *Url) UnmarshalText(text []byte) error {
if err != nil {
return err
}
if len(urlValue.Scheme) == 0 || len(urlValue.Host) == 0 {
return fmt.Errorf("invalid URL (missing protocol and/or hostname). Expected URL in a format 'protocol://hostname:port' (e.g. 'http://localhost:8123'), but instead got: %s", urlValue)
}
if len(urlValue.Port()) == 0 {
return fmt.Errorf("URL port (e.g. 8123 in 'http://localhost:8123') is missing from the provided URL: %s", urlValue)
mieciu marked this conversation as resolved.
Show resolved Hide resolved
}
*u = Url(*urlValue)
return nil
}
Expand Down
Loading