Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to using Go SDK. #16

Merged
merged 12 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
(VERBOSE).SILENT:
############################# Main targets #############################
# Install dependencies.
install: buf-install grpc-install

# Run all linters and compile proto files.
proto: copy-api-cloud-api grpc

# Build the worker.
bins: worker
# Build the binaries.
bins: clean worker exporttool

########################################################################

Expand All @@ -21,41 +15,6 @@ SHELL := PATH=$(GOBIN):$(PATH) /bin/sh

COLOR := "\e[1;36m%s\e[0m\n"

PROTO_ROOT := proto
PROTO_OUT := protogen
$(PROTO_OUT):
mkdir $(PROTO_OUT)

##### Copy the proto files from the api-cloud repo #####
copy-api-cloud-api:
@printf $(COLOR) "Copy api-cloud..."
rm -rf $(PROTO_ROOT)/temporal/api
mkdir -p $(PROTO_ROOT)/temporal/api
git clone [email protected]:temporalio/api-cloud.git --depth=1 --branch main --single-branch $(PROTO_ROOT)/api-cloud-tmp
mv -f $(PROTO_ROOT)/api-cloud-tmp/temporal/api/cloud $(PROTO_ROOT)/temporal/api
mv $(PROTO_ROOT)/api-cloud-tmp/VERSION client/api/version
rm -rf $(PROTO_ROOT)/api-cloud-tmp

##### Compile proto files for go #####
grpc: go-grpc fix-proto-generated-go-path

go-grpc: clean $(PROTO_OUT)
printf $(COLOR) "Compile for go-gRPC..."
cd proto && buf generate --output ../

fix-proto-generated-go-path:
@if [ "$$(uname -s)" = "Darwin" ]; then find $(PROTO_OUT) -name '*.go' -exec sed -i '' "s/go\.temporal\.io/github\.com\/temporalio\/cloud-samples-go\/protogen\/temporal/g" {} \;; else find $(PROTO_OUT) -name '*.go' -exec sed -i 's/go\.temporal\.io/github\.com\/temporalio\/cloud-samples-go\/protogen\/temporal/g' {} \;; fi

##### Plugins & tools #####
buf-install:
printf $(COLOR) "Install/update buf..."
go install github.com/bufbuild/buf/cmd/[email protected]

grpc-install:
printf $(COLOR) "Install/update gRPC plugins..."
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

bins: clean worker exporttool

##### Build #####
Expand Down
47 changes: 0 additions & 47 deletions client/api/apikey.go

This file was deleted.

87 changes: 34 additions & 53 deletions client/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,50 @@ package api

import (
"context"
"crypto/tls"
"fmt"
"net/url"
"strings"
"time"

grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

func NewConnectionWithAPIKey(addrStr string, allowInsecure bool, apiKey string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return NewConnection(
addrStr,
allowInsecure,
append(opts, grpc.WithPerRPCCredentials(NewAPIKeyRPCCredential(apiKey, allowInsecure)))...,
)
type Client struct {
client.CloudOperationsClient
}

func NewConnection(addrStr string, allowInsecure bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
addr, err := url.Parse(addrStr)
if err != nil {
return nil, fmt.Errorf("unable to parse server address: %s", err)
}
defaultOpts, err := defaultDialOptions(addr, allowInsecure)
if err != nil {
return nil, fmt.Errorf("failed to generate default dial options: %s", err)
}

conn, err := grpc.Dial(
addr.String(),
append(defaultOpts, opts...)...,
)
if err != nil {
return nil, fmt.Errorf("failed to dial `%s`: %v", addr.String(), err)
}
return conn, nil
}
var (
_ client.CloudOperationsClient = &Client{}

func defaultDialOptions(addr *url.URL, allowInsecure bool) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
TemporalCloudAPIAddress = "saas-api.tmprl.cloud:443"
TemporalCloudAPIVersion = "2024-10-01-00"
)

transport := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
ServerName: addr.Hostname(),
func NewConnectionWithAPIKey(addrStr string, allowInsecure bool, apiKey string) (*Client, error) {

var cClient client.CloudOperationsClient
var err error
cClient, err = client.DialCloudOperationsClient(context.Background(), client.CloudOperationsClientOptions{
Version: TemporalCloudAPIVersion,
Credentials: client.NewAPIKeyStaticCredentials(apiKey),
DisableTLS: allowInsecure,
HostPort: addrStr,
ConnectionOptions: client.ConnectionOptions{
DialOptions: []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
grpcretry.UnaryClientInterceptor(
grpcretry.WithBackoff(
grpcretry.BackoffExponentialWithJitter(250*time.Millisecond, 0.1),
),
grpcretry.WithMax(5),
),
),
},
},
})
if allowInsecure {
transport = insecure.NewCredentials()
if err != nil {
return nil, fmt.Errorf("failed to connect `%s`: %v", client.DefaultHostPort, err)
}

opts = append(opts, grpc.WithTransportCredentials(transport))
opts = append(opts, grpc.WithUnaryInterceptor(setAPIVersionInterceptor))
return opts, nil
}

func setAPIVersionInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, TemporalCloudAPIVersionHeader, strings.TrimSpace(TemporalCloudAPIVersion))
return invoker(ctx, method, req, reply, cc, opts...)
return &Client{cClient}, nil
}
1 change: 0 additions & 1 deletion client/api/version

This file was deleted.

12 changes: 0 additions & 12 deletions client/api/version.go

This file was deleted.

127 changes: 92 additions & 35 deletions client/temporal/client.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,134 @@
package temporal

import (
"context"
"crypto/tls"
"fmt"
"net"

"github.com/temporalio/cloud-samples-go/client/api"
"github.com/temporalio/cloud-samples-go/internal/validator"
cloudservicev1 "go.temporal.io/api/cloud/cloudservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
)

const (
localTemporalHostPort = "localhost:7233"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type (
ApiKeyAuth struct {
// The api key to use for the client
APIKey string
}

MtlsAuth struct {
// The temporal cloud namespace's grpc endpoint address
// defaults to '<namespace>.tmprl.cloud:7233'
GRPCEndpoint string
// The TLS cert and key file paths
// Read more about TLS in Temporal here: https://docs.temporal.io/cloud/Certificates
TLSCertFilePath string
TLSKeyFilePath string
}

GetTemporalCloudNamespaceClientInput struct {
// The temporal cloud namespace to connect to (required) for e.g. "prod.a2dd6"
Namespace string `required:"true"`
// The temporal cloud namespace's grpc endpoint address, defaults to '<namespace>.tmprl.cloud:7233'
GRPCEndpoint string

// The TLS cert and key file paths
// Read more about TLS in Temporal here: https://docs.temporal.io/cloud/certificates
TLSCertFilePath string `required:"true"`
TLSKeyFilePath string `required:"true"`
// The auth to use for the client, defaults to local
Auth AuthType

// The API key to use for the client, defaults to no API key.
APIKey string

// The logger to use for the client, defaults to no logging
Logger log.Logger
}

AuthType interface {
apply(options *client.Options) error
}
)

func GetTemporalCloudNamespaceClient(input *GetTemporalCloudNamespaceClientInput) (client.Client, error) {
func (a *ApiKeyAuth) apply(options *client.Options) error {

err := validator.ValidateStruct(input)
c, err := api.NewConnectionWithAPIKey(api.TemporalCloudAPIAddress, false, a.APIKey)
if err != nil {
return nil, err
return fmt.Errorf("failed to create cloud api connection: %w", err)
}
if input.GRPCEndpoint == "" {
input.GRPCEndpoint = fmt.Sprintf("%s.tmprl.cloud:7233", input.Namespace)
}
tlsConfig, err := getTLSConfig(input)
resp, err := c.CloudService().GetNamespace(context.Background(), &cloudservicev1.GetNamespaceRequest{
Namespace: options.Namespace,
})
if err != nil {
return nil, fmt.Errorf("failed to get TLS config: %w", err)
return fmt.Errorf("failed to get namespace %s: %w", options.Namespace, err)
}
opts := client.Options{
HostPort: input.GRPCEndpoint,
Namespace: input.Namespace,
ConnectionOptions: client.ConnectionOptions{TLS: tlsConfig},
Logger: input.Logger,
if resp.GetNamespace().GetEndpoints().GetGrpcAddress() == "" {
return fmt.Errorf("namespace %q has no grpc address", options.Namespace)
}
if input.Logger != nil {
opts.Logger = input.Logger
options.HostPort = resp.GetNamespace().GetEndpoints().GetGrpcAddress()
options.Credentials = client.NewAPIKeyStaticCredentials(a.APIKey)
options.ConnectionOptions = client.ConnectionOptions{
TLS: &tls.Config{},
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req any, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(
metadata.AppendToOutgoingContext(ctx, "temporal-namespace", options.Namespace),
method,
req,
reply,
cc,
opts...,
)
},
),
},
}
return client.Dial(opts)
return nil
}

func getTLSConfig(input *GetTemporalCloudNamespaceClientInput) (*tls.Config, error) {
if input.TLSCertFilePath == "" || input.TLSKeyFilePath == "" {
return nil, nil
func (a *MtlsAuth) apply(options *client.Options) error {
endpoint := a.GRPCEndpoint
if endpoint == "" {
endpoint = fmt.Sprintf("%s.tmprl.cloud:7233", options.Namespace)
}
if a.TLSCertFilePath == "" || a.TLSKeyFilePath == "" {
return fmt.Errorf("both tls cert and key file paths are required")
}
serverName, _, parseErr := net.SplitHostPort(input.GRPCEndpoint)
serverName, _, parseErr := net.SplitHostPort(endpoint)
if parseErr != nil {
return nil, fmt.Errorf("failed to split hostport %s: %w", input.GRPCEndpoint, parseErr)
return fmt.Errorf("failed to split hostport %s: %w", endpoint, parseErr)
}
var cert tls.Certificate
var err error
cert, err = tls.LoadX509KeyPair(input.TLSCertFilePath, input.TLSKeyFilePath)
cert, err = tls.LoadX509KeyPair(a.TLSCertFilePath, a.TLSKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS from files: %w", err)
return fmt.Errorf("failed to load TLS from files: %w", err)
}
return &tls.Config{
options.HostPort = endpoint
options.ConnectionOptions = client.ConnectionOptions{TLS: &tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: serverName,
}, nil
}}
return nil
}

func GetTemporalCloudNamespaceClient(ctx context.Context, input *GetTemporalCloudNamespaceClientInput) (client.Client, error) {
err := validator.ValidateStruct(input)
if err != nil {
return nil, err
}

opts := client.Options{
Namespace: input.Namespace,
Logger: input.Logger,
}
err = input.Auth.apply(&opts)
if err != nil {
return nil, err
}
if input.Logger != nil {
opts.Logger = input.Logger
}
return client.Dial(opts)
}
Loading