Skip to content

Commit

Permalink
Write events out to postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
jveski committed Jan 2, 2024
1 parent 646e538 commit 8c4dc49
Show file tree
Hide file tree
Showing 259 changed files with 150,556 additions and 231 deletions.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@ toolchain go1.21.1

require (
github.com/Nerzal/gocloak/v13 v13.8.0
github.com/jackc/pgx v3.6.2+incompatible
github.com/kelseyhightower/envconfig v1.4.0
github.com/stripe/stripe-go/v75 v75.11.0
golang.org/x/time v0.5.0
)

require (
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
16 changes: 14 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
github.com/Nerzal/gocloak/v13 v13.8.0 h1:7s9cK8X3vy8OIic+pG4POE9vGy02tSHkMhvWXv0P2m8=
github.com/Nerzal/gocloak/v13 v13.8.0/go.mod h1:rRBtEdh5N0+JlZZEsrfZcB2sRMZWbgSxI2EIv9jpJp4=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -17,15 +27,17 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stripe/stripe-go/v75 v75.11.0 h1:jLbHQGRrptDS815sMKFFbTqVtrh+ugzO39zRVaU1Xe8=
github.com/stripe/stripe-go/v75 v75.11.0/go.mod h1:wT44gah+eCY8Z0aSpY/vQlYYbicU9uUAbAqdaUxxDqE=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
Expand Down
5 changes: 5 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ type Env struct {

PaypalClientID string `split_words:"true"`
PaypalClientSecret string `split_words:"true"`

EventPsqlAddr string `split_words:"true"`
EventPsqlUsername string `split_words:"true"`
EventPsqlPassword string `split_words:"true"`
EventBufferLength int `split_words:"true" default:"50"`
}
86 changes: 86 additions & 0 deletions internal/eventing/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package eventing

import (
"fmt"
"log"
"time"

"github.com/jackc/pgx"

"github.com/TheLab-ms/profile/internal/conf"
)

const migration = `
CREATE TABLE IF NOT EXISTS profile_events (
id integer primary key,
time timestamp not null,
email text not null,
reason text not null,
message text not null
);
CREATE INDEX IF NOT EXISTS idx_profile_events_time ON profile_events (time);
`

type Sink struct {
buffer chan *event
}

func NewSink(env *conf.Env) (*Sink, error) {
s := &Sink{}
if env.EventPsqlAddr == "" {
return s, nil
}

db, err := pgx.Connect(pgx.ConnConfig{
Host: env.EventPsqlAddr,
User: env.EventPsqlUsername,
Password: env.EventPsqlPassword,
})
if err != nil {
return nil, fmt.Errorf("constructing db client: %w", err)
}

_, err = db.Exec(migration)
if err != nil {
return nil, fmt.Errorf("db migration: %w", err)
}

// Flush messages out to postgres
s.buffer = make(chan *event, env.EventBufferLength)
go func() {
defer db.Close()

for event := range s.buffer {
_, err := db.Exec("INSERT INTO profile_events (time, email, reason, message) VALUES ($1, $2, $3, $4)", event.Timestamp, event.Email, event.Reason, event.Message)
if err != nil {
log.Printf("error while flushing event to postgres: %s", err) // it would be a good idea to retry here
}

// don't send messages too often
// batching would be nice, this is easier to implement
time.Sleep(time.Second)
}
}()

return s, nil
}

func (s *Sink) Publish(email, reason, templ string, args ...any) {
if s.buffer == nil {
return
}
s.buffer <- &event{
Timestamp: time.Now(),
Email: email,
Reason: reason,
Message: fmt.Sprintf(templ, args...),
}
}

type event struct {
Timestamp time.Time
Email string
Reason string
Message string
}
41 changes: 10 additions & 31 deletions internal/keycloak/keycloak.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,20 @@ func (k *Keycloak) GetUserByEmail(ctx context.Context, email string) (*User, err
return newUser(kcusers[0])
}

func (k *Keycloak) UpdateUserFobID(ctx context.Context, userID string, fobID int) error {
func (k *Keycloak) UpdateUserFobID(ctx context.Context, user *User, fobID int) error {
token, err := k.GetToken(ctx)
if err != nil {
return fmt.Errorf("getting token: %w", err)
}

kcuser, err := k.Client.GetUserByID(ctx, token.AccessToken, k.env.KeycloakRealm, userID)
if err != nil {
return fmt.Errorf("getting current user: %w", err)
}

attr := safeGetAttrs(kcuser)
attr := safeGetAttrs(user.keycloakObject)
if fobID == 0 {
attr["keyfobID"] = []string{""}
} else {
attr["keyfobID"] = []string{strconv.Itoa(fobID)}
}

return k.Client.UpdateUser(ctx, token.AccessToken, k.env.KeycloakRealm, *kcuser)
return k.Client.UpdateUser(ctx, token.AccessToken, k.env.KeycloakRealm, *user.keycloakObject)
}

func (k *Keycloak) UpdateUserWaiverState(ctx context.Context, email string) error {
Expand All @@ -201,46 +196,30 @@ func (k *Keycloak) UpdateUserWaiverState(ctx context.Context, email string) erro
return k.Client.UpdateUser(ctx, token.AccessToken, k.env.KeycloakRealm, *kcuser)
}

func (k *Keycloak) UpdateUserName(ctx context.Context, userID, first, last string) error {
func (k *Keycloak) UpdateUserName(ctx context.Context, user *User, first, last string) error {
token, err := k.GetToken(ctx)
if err != nil {
return fmt.Errorf("getting token: %w", err)
}

kcuser, err := k.Client.GetUserByID(ctx, token.AccessToken, k.env.KeycloakRealm, userID)
if err != nil {
return fmt.Errorf("getting current user: %w", err)
}

kcuser.FirstName = &first
kcuser.LastName = &last

return k.Client.UpdateUser(ctx, token.AccessToken, k.env.KeycloakRealm, *kcuser)
user.keycloakObject.FirstName = &first
user.keycloakObject.LastName = &last
return k.Client.UpdateUser(ctx, token.AccessToken, k.env.KeycloakRealm, *user.keycloakObject)
}

func (k *Keycloak) UpdateUserStripeInfo(ctx context.Context, customer *stripe.Customer, sub *stripe.Subscription) error {
func (k *Keycloak) UpdateUserStripeInfo(ctx context.Context, user *User, customer *stripe.Customer, sub *stripe.Subscription) error {
token, err := k.GetToken(ctx)
if err != nil {
return fmt.Errorf("getting token: %w", err)
}

kcusers, err := k.Client.GetUsers(ctx, token.AccessToken, k.env.KeycloakRealm, gocloak.GetUsersParams{
Email: &customer.Email,
})
if err != nil {
return fmt.Errorf("getting current user: %w", err)
}
if len(kcusers) == 0 {
return errors.New("user not found")
}
kcuser := kcusers[0]

kcuser := user.keycloakObject
attr := safeGetAttrs(kcuser)
active := sub.Status == stripe.SubscriptionStatusActive

// Don't de-activate accounts when we receive cancelation webhooks for a subscription that is not currently in use.
// This shouldn't be possible for any accounts other than tests.
if !active && !strings.EqualFold(safeGetAttr(kcuser, "stripeSubscriptionID"), sub.ID) {
if !active && !strings.EqualFold(user.StripeSubscriptionID, sub.ID) {
log.Printf("dropping cancelation webhook for user %s because the subscription ID doesn't match the one in keycloak", *kcuser.Email)
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions internal/keycloak/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type User struct {
LastPaypalTransactionPrice float64
LastPaypalTransactionTime time.Time
PaypalSubscriptionID string

keycloakObject *gocloak.User
}

func newUser(kcuser *gocloak.User) (*User, error) {
Expand All @@ -37,6 +39,7 @@ func newUser(kcuser *gocloak.User) (*User, error) {
StripeSubscriptionID: safeGetAttr(kcuser, "stripeSubscriptionID"),
BuildingAccessApproved: safeGetAttr(kcuser, "buildingAccessApprover") != "",
StripeCustomerID: safeGetAttr(kcuser, "stripeID"),
keycloakObject: kcuser,
}
user.FobID, _ = strconv.Atoi(safeGetAttr(kcuser, "keyfobID"))
user.StripeCancelationTime, _ = strconv.ParseInt(safeGetAttr(kcuser, "stripeCancelationTime"), 10, 0)
Expand Down
Loading

0 comments on commit 8c4dc49

Please sign in to comment.