Skip to content

Commit

Permalink
Merge branch 'main' into readme_fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tsg committed Aug 8, 2024
2 parents 4dfc091 + b56dba5 commit 0227467
Show file tree
Hide file tree
Showing 45 changed files with 573 additions and 203 deletions.
12 changes: 6 additions & 6 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"

"github.com/spf13/viper"
"github.com/xataio/pgstream/internal/backoff"
"github.com/xataio/pgstream/internal/kafka"
"github.com/xataio/pgstream/internal/tls"
"github.com/xataio/pgstream/pkg/backoff"
"github.com/xataio/pgstream/pkg/kafka"
pgschemalog "github.com/xataio/pgstream/pkg/schemalog/postgres"
"github.com/xataio/pgstream/pkg/stream"
"github.com/xataio/pgstream/pkg/tls"
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
Expand Down Expand Up @@ -169,7 +169,7 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {
Store: opensearch.Config{
URL: searchStore,
},
Retrier: &search.StoreRetryConfig{
Retrier: search.StoreRetryConfig{
Backoff: parseBackoffConfig("PGSTREAM_SEARCH_STORE"),
},
}
Expand Down Expand Up @@ -245,8 +245,8 @@ func parseTranslatorConfig() *translator.Config {
}
}

func parseTLSConfig(prefix string) *tls.Config {
return &tls.Config{
func parseTLSConfig(prefix string) tls.Config {
return tls.Config{
Enabled: viper.GetBool(fmt.Sprintf("%s_TLS_ENABLED", prefix)),
CaCertFile: viper.GetString(fmt.Sprintf("%s_TLS_CA_CERT_FILE", prefix)),
ClientCertFile: viper.GetString(fmt.Sprintf("%s_TLS_CLIENT_CERT_FILE", prefix)),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/jackc/pgx/v5 v5.6.0
github.com/labstack/echo/v4 v4.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/pterm/pterm v0.12.79
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down
83 changes: 0 additions & 83 deletions internal/tls/tls.go

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

package mocks

import "github.com/xataio/pgstream/internal/backoff"
import "github.com/xataio/pgstream/pkg/backoff"

type Backoff struct {
RetryNotifyFn func(backoff.Operation, backoff.Notify) error
Expand Down
4 changes: 2 additions & 2 deletions internal/kafka/config.go → pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

package kafka

import tlslib "github.com/xataio/pgstream/internal/tls"
import tlslib "github.com/xataio/pgstream/pkg/tls"

type ConnConfig struct {
Servers []string
Topic TopicConfig
TLS *tlslib.Config
TLS tlslib.Config
}

type TopicConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/kafka/conn.go → pkg/kafka/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
"strconv"
"time"

tlslib "github.com/xataio/pgstream/internal/tls"
tlslib "github.com/xataio/pgstream/pkg/tls"

"github.com/segmentio/kafka-go"
)

// withConnection creates a connection that can be used by the kafka operation
// passed in the parameters. This ensures the cleanup of all connection resources.
func withConnection(config *ConnConfig, kafkaOperation func(conn *kafka.Conn) error) error {
dialer, err := buildDialer(config.TLS)
dialer, err := buildDialer(&config.TLS)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"time"

"github.com/xataio/pgstream/internal/kafka"
"github.com/xataio/pgstream/pkg/kafka"

"go.opentelemetry.io/otel/metric"
)
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewReader(config ReaderConfig, logger loglib.Logger) (*Reader, error) {
return nil, fmt.Errorf("unsupported start offset [%s], must be one of [%s, %s]", config.ConsumerGroupStartOffset, earliestOffset, latestOffset)
}

dialer, err := buildDialer(config.Conn.TLS)
dialer, err := buildDialer(&config.Conn.TLS)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/kafka/kafka_writer.go → pkg/kafka/kafka_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/segmentio/kafka-go"
tlslib "github.com/xataio/pgstream/internal/tls"
loglib "github.com/xataio/pgstream/pkg/log"
tlslib "github.com/xataio/pgstream/pkg/tls"
)

type MessageWriter interface {
Expand Down Expand Up @@ -56,7 +56,7 @@ func NewWriter(config WriterConfig, logger loglib.Logger) (*Writer, error) {
}
}

transport, err := buildTransport(config.Conn.TLS)
transport, err := buildTransport(&config.Conn.TLS)
if err != nil {
return nil, err
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

package mocks

import "github.com/xataio/pgstream/internal/kafka"
import "github.com/xataio/pgstream/pkg/kafka"

type OffsetParser struct {
ToStringFn func(o *kafka.Offset) string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package mocks
import (
"context"

"github.com/xataio/pgstream/internal/kafka"
"github.com/xataio/pgstream/pkg/kafka"
)

type Reader struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"context"
"sync/atomic"

"github.com/xataio/pgstream/internal/kafka"
"github.com/xataio/pgstream/pkg/kafka"
)

type Writer struct {
Expand Down
2 changes: 0 additions & 2 deletions pkg/schemalog/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ func (m *LogEntry) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal([]byte(schemaStr), &m.Schema); err != nil {
return err
}
default:
panic(fmt.Sprintf("unmarshal LogEntry, got unexpected key when unmarshalling: %s", k))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type KafkaProcessorConfig struct {
type SearchProcessorConfig struct {
Indexer search.IndexerConfig
Store opensearch.Config
Retrier *search.StoreRetryConfig
Retrier search.StoreRetryConfig
}

type WebhookProcessorConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"testing"

"github.com/stretchr/testify/require"
kafkalib "github.com/xataio/pgstream/internal/kafka"
"github.com/xataio/pgstream/internal/log/zerolog"
pglib "github.com/xataio/pgstream/internal/postgres"
"github.com/xataio/pgstream/internal/tls"
kafkalib "github.com/xataio/pgstream/pkg/kafka"
loglib "github.com/xataio/pgstream/pkg/log"
schemalogpg "github.com/xataio/pgstream/pkg/schemalog/postgres"
"github.com/xataio/pgstream/pkg/stream"
"github.com/xataio/pgstream/pkg/tls"
"github.com/xataio/pgstream/pkg/wal"
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
Expand Down Expand Up @@ -174,7 +174,7 @@ func testKafkaCfg() kafkalib.ConnConfig {
Name: "integration-tests",
AutoCreate: true,
},
TLS: &tls.Config{
TLS: tls.Config{
Enabled: false,
},
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/stream/stream_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric
if err != nil {
return err
}
if config.Processor.Search.Retrier != nil {
logger.Debug("using retry logic with search store...")
searchStore = search.NewStoreRetrier(searchStore, config.Processor.Search.Retrier, search.WithStoreLogger(logger))
}
searchStore = search.NewStoreRetrier(searchStore, config.Processor.Search.Retrier, search.WithStoreLogger(logger))

searchIndexer := search.NewBatchIndexer(ctx,
config.Processor.Search.Indexer,
Expand Down
27 changes: 27 additions & 0 deletions pkg/tls/test/test.csr
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIEjTCCAnUCAQAwSDELMAkGA1UEBhMCU1AxEzARBgNVBAgMClNvbWUtU3RhdGUx
FTATBgNVBAoMDFBnc3RyZWFtIEx0ZDENMAsGA1UECwwEWGF0YTCCAiIwDQYJKoZI
hvcNAQEBBQADggIPADCCAgoCggIBAMZG8/obpyvJ+WkGkdO/hbExSN1nWR206/Dh
pSYzcZyI1Jj0R4Af0gD/EFVM+4KTDr20nOofmfBWOYHV+KwiKtWQQ+oT0+xVcTT6
IC5I5K9+AXERuTu/NbnjkxuC/1u7K511RrUK0lxUra2/B8mGTc9nu2g415GVk2hU
rJjWEX09hVH7xBSmnzYN+IfepsftxgnR5m2YzqOSnsphBBfyyOsL+3Jo5Uv4yY22
bnJCDxx/TPG37EcGMb4Q/aCWk5mXm4Io5mBfcl7SNxy867JpBO2CCP6fFaWRUGmF
/O+YBD5z0cSb1wZrMBRgezggpa2gacYVtsWrQYzAxMtCf3MwM89z8i+W9ME3cMhg
T7b6T0XCQ5gkqmLxDCsg9ocNV0W0wb5EEPlt4/TeqMmLbpLkIa6rnR2C6gg+wFSQ
Vk8c/aBtm9BjKFLWWUwDUPPzp1RdIVAfuobDK37dVqr+1m6oTDl+BoDuM5PsgTS4
bFW2ZJVZB+d4IFmGhqTtKKmYa7QbwhzF8i5ShV+419KOt6hRkJ/jREdea7ZS/uyK
wBAvPHZVkUk8tfTpcuKTFVKsgXXV8uwwnI2W6safYbTXaN+7gfA88wntaigjeij/
LrO+itAiv9GTGkpMlXyuAX/d5+4j0EHcV64NYL61GvMcbJ5G0SeQrXuBlgMTYaNr
O7miEujnAgMBAAGgADANBgkqhkiG9w0BAQsFAAOCAgEAGYFGSgMARWuH5VXK0/Fl
nNg+/rzdff1NYkY9QrvFxCQeVJ9rD1ml7VLZXDtXhMNEGJyYbkouc1Ehx0BsihT6
YztcnQ6TzWAzvr3Ns9X3riADzXxdDHV5xs+8VPV8RvT3XNcrlw2NQmzJ4Juc8PkT
4ZfguZBywAmFTw1oX8JqlQSp5pYtP7popsvGPS6ieUm0Kmv8kK3sRDs+JSc7iXtB
/HymqeSylFNHgFZsdbYmu32v2qbcqimAitB/v5tGNhuiMXx6vEeQnB69V+AV70Rl
9dnvAo7ihTRMzecUVsoDFtc8OWSPdTm6t8vDI2JqmeDN25Xhyf89cKwoY97NhA99
ds5WHs6TzsHohPZAsaxtZnjwxMEne7Y4FLFmTHVk5o0POTZcOC/sMB1iBDsd/YJe
AYsoiqLYtu6x6Avfe6LXWYWYa/R4/UXh8H6WiChsFXzOIilp3apjaeHM3z7iKx2S
VtGyVTrrcbzRiF0ShKVnbDXnvcoNZxPiXfh6Zz4SkBbV01T3hluBwzp4mjcWpiv3
AOAWChMnbmkg/T+OME6e1JVHDR5tAC/7vF2QkZYpiH2RVnZmCTDWBcRGpMkhkRgF
eycowzKBkgIOcJ99p0sGEqQ3W0J1M4bzuumncLID08EG/dEp1eIdunahcHHyhnnv
BcGFr2/OxuaVmxcy5/QQjAg=
-----END CERTIFICATE REQUEST-----
52 changes: 52 additions & 0 deletions pkg/tls/test/test.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQDGRvP6G6cryflp
BpHTv4WxMUjdZ1kdtOvw4aUmM3GciNSY9EeAH9IA/xBVTPuCkw69tJzqH5nwVjmB
1fisIirVkEPqE9PsVXE0+iAuSOSvfgFxEbk7vzW545Mbgv9buyuddUa1CtJcVK2t
vwfJhk3PZ7toONeRlZNoVKyY1hF9PYVR+8QUpp82DfiH3qbH7cYJ0eZtmM6jkp7K
YQQX8sjrC/tyaOVL+MmNtm5yQg8cf0zxt+xHBjG+EP2glpOZl5uCKOZgX3Je0jcc
vOuyaQTtggj+nxWlkVBphfzvmAQ+c9HEm9cGazAUYHs4IKWtoGnGFbbFq0GMwMTL
Qn9zMDPPc/IvlvTBN3DIYE+2+k9FwkOYJKpi8QwrIPaHDVdFtMG+RBD5beP03qjJ
i26S5CGuq50dguoIPsBUkFZPHP2gbZvQYyhS1llMA1Dz86dUXSFQH7qGwyt+3Vaq
/tZuqEw5fgaA7jOT7IE0uGxVtmSVWQfneCBZhoak7SipmGu0G8IcxfIuUoVfuNfS
jreoUZCf40RHXmu2Uv7sisAQLzx2VZFJPLX06XLikxVSrIF11fLsMJyNlurGn2G0
12jfu4HwPPMJ7WooI3oo/y6zvorQIr/RkxpKTJV8rgF/3efuI9BB3FeuDWC+tRrz
HGyeRtEnkK17gZYDE2Gjazu5ohLo5wIDAQABAoICAFEXJaMNejIzeViVwkA6nP/Z
6zX5lX3Lx48NidB0y6s8Xs5rYW6qFOYpatGoGVjOsgGuA1rRL9EWQpCyJPCpTKFp
Tg1GrK6ERzdmcJDdaQHI4+gNWpdv3RY4V6qxyaQHiY/tLczPLzdpvlpHvXSTA/Gm
OAQo8yjsZowNzUT4j9CLv6HG+OuFNaoSzqkqy0ULHqpXeQkrrJ9DUMPuJ5FvzvIq
RV0GP3jxt+TITqVWFP4PpjVZhj2J8AAOzNvHmXgAhC4YchfKEWlsSfPr4+1kfApy
2yDfiSfcpWlyzf5jSqEMFyd0oN1UKya6Ssqqt3eqGnhT2xs+riFVmWaTvLIsbZNb
ML2OT7s0nqSkhkomVcQxgh+rypgljPB2JhHNSScKF3BYlIBFX9GarZ32vcLdBb/Z
u1GOiu6Fd5uBVfS0PTwX/8/NwCOmofUTxUa3J8oLcxBZRe8y0sxnfdsbKbXLviym
okOUyk6h4KaTXB2+99Q7TSHGGXissCeTCU8tIMEKFHlN3PQeSphEcHvhIBKTPdK3
vCbzfpoMWzyTBvSdffQRewSwij3fImPrWsE9s15jjDW3AwLjuNtPCYg3Amu0yK0k
ktAnJgruoWhnrxJae9iXPUuZghPvweDNnhtQb5Uf3jeYKwL2fLE3eaVtjqiWha5B
ziDK7ZelBH85IEP6COWxAoIBAQDpHIZ/aFl49AV0BhCDv2+28OlR1s6p4QZmX5mb
pQfON868Ya7iR+tPjzN71IJfga02QiaaXOUeh0FxCJMqdypSLW6XOIlppHLO8jmV
6RztF/iA1h1+fo3DMuuL2Nw3ivo4NhBvSoyfAfSI+Nn5eJl2Zqhason5aWSZsxrN
9A4Kcig8RS3aE3zQKCsT/ZBeM7AT+qqtGNceRh9pLgYkSnaugDX4JwD+f5VhUQOd
ivB9E4xHZJJN61iefok1SkMekQ+GF8WQRCr8dKbQAgxh5Eu84J7iax4EvFsoHuIs
9Wez0Yiy14ArW2+tkA+2GE0PBRvOPfjtVZ3n3lNOCjbLozWfAoIBAQDZvtXst+CS
lQJjkDenb51ujAum9JaxbKoU2kOEEEeN8AYgvmIqpSsgaMmzYNtW1yVT+O2z0Ix7
I6lG2cNz5XuWWJkBroAwPeHBZjOwyR9AdglYenIui6dftTdMDVnYbt/Fu905yh3t
fGtT8IRGond3c+WZZCgbr6sO+nq4PpG+i+nuyqOo0JJCLtgWyyK9jrfEhk2fNZyK
B7uaKwOhMHetkfnp5jyszGYsU2YU++NGL0vcacXUbZyYV4BT7+s3m+H0G8wf4qSP
FAkX+sloEl5HRWDB4SP0H9erKg8zoKDI7DVtv2DJLnLMB88XjOz3l24/HOVzyI+N
X3SfSQP55De5AoIBACpwGgA55AgEDLYZoIoLoO/iHefbPlZo8/xRLSrLuYcOW+Gp
uufRBgK+5DWH85AlkH4PPu3dOYz8PKqyT/BsL1U0liyLi2CjIo+QQ3GKNczoD0KN
OGNd8Lr3mzAjc7vc3j67gPRx0vXjqjwBadVj4jRO7hlM5Zd1W24r0BZsdt3p+G84
fOd1osRWe7kw8UZlDIommUnX+tm1FGTWjyGuOLr99lVN7H1ohq5nzEuzDqMGmwQo
SAZNcR2xlZMRCPUYnYXg8AOalWTOa8v0g4KSyEMDdYlszNM54zKDpNNgfdebrtI4
L0o1ZDhpwKJ6/BRe7rf2SkoSyyN6MxpC+8TI2qsCggEAbN/X1VoHpyNso13cBhNw
E3Ng7CUGKEbeMDkGY0VEkfr/BWZMbWhSzQy4NcHrSlufJYKlUDCp3XRyUqPV7+BB
0GYSc13OaNC4TdyNYgreXnvmpl/rMczQbrGMqbFPSEIAD72kmx2toy5/9+OeMDdS
Jt9DYVRMHbPTg1TJAdD/TNhmquiVtnY7e24yzArcHw36YwCIVWAYGohNTIPPd8xl
Ottvq31cv0YgnG9C7qEX/eLuOpKEwXfhQecWmmGvKgn+i/FOOm83uvbYqS3TgP8W
NurAu5CYSpuVWddY7IaXfn9lI6/6c/2Olugcq3jij9Ye4N3Q+PjClnyxMmfu3gc3
uQKCAQAmpjnO3OukXxgEtDusQKh/IxsvKQMfSQNh68cQ6iroGDAguYBusSTYU1gA
IKvR6Uqqv52yPe6u5pYC0fA8L+2S3nuFA4f4a1JHdpf5X5HwBtTERG4Qxh7p1gss
3AZpLfYa442d8UuCCYoimBXqXXF0TLsfoRjcgrKd9yNO6Pa79jzUR+ixQmkyE7XA
XGHx7Qsl6E0E1DgK4MHPOALg/tJiLNJQgIKDtiBn4GTR8tSHRDY64rEuOClq14y1
cpXNj7lcV1xz1vLNRcksS0/QA7Hzol+Dt8xVOBK1smhCREMetUMXHwBYLnNemxNX
/WF38bnN3/Dej6Ns6FRE8gNSRmeT
-----END PRIVATE KEY-----
Loading

0 comments on commit 0227467

Please sign in to comment.