Skip to content

Commit

Permalink
Merge pull request #16 from temporalio/abhinav/moveToUsingGoSDK
Browse files Browse the repository at this point in the history
Move to using Go SDK.
  • Loading branch information
anekkanti authored Nov 5, 2024
2 parents 5ed2604 + f8aabbe commit 065b08c
Show file tree
Hide file tree
Showing 33 changed files with 348 additions and 5,880 deletions.
45 changes: 2 additions & 43 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
(VERBOSE).SILENT:
############################# Main targets #############################
# Install dependencies.
install: buf-install grpc-install

# Run all linters and compile proto files.
proto: copy-api-cloud-api grpc

# Build the worker.
bins: worker
# Build the binaries.
bins: clean worker exporttool

########################################################################

Expand All @@ -21,41 +15,6 @@ SHELL := PATH=$(GOBIN):$(PATH) /bin/sh

COLOR := "\e[1;36m%s\e[0m\n"

PROTO_ROOT := proto
PROTO_OUT := protogen
$(PROTO_OUT):
mkdir $(PROTO_OUT)

##### Copy the proto files from the api-cloud repo #####
copy-api-cloud-api:
@printf $(COLOR) "Copy api-cloud..."
rm -rf $(PROTO_ROOT)/temporal/api
mkdir -p $(PROTO_ROOT)/temporal/api
git clone [email protected]:temporalio/api-cloud.git --depth=1 --branch main --single-branch $(PROTO_ROOT)/api-cloud-tmp
mv -f $(PROTO_ROOT)/api-cloud-tmp/temporal/api/cloud $(PROTO_ROOT)/temporal/api
mv $(PROTO_ROOT)/api-cloud-tmp/VERSION client/api/version
rm -rf $(PROTO_ROOT)/api-cloud-tmp

##### Compile proto files for go #####
grpc: go-grpc fix-proto-generated-go-path

go-grpc: clean $(PROTO_OUT)
printf $(COLOR) "Compile for go-gRPC..."
cd proto && buf generate --output ../

fix-proto-generated-go-path:
@if [ "$$(uname -s)" = "Darwin" ]; then find $(PROTO_OUT) -name '*.go' -exec sed -i '' "s/go\.temporal\.io/github\.com\/temporalio\/cloud-samples-go\/protogen\/temporal/g" {} \;; else find $(PROTO_OUT) -name '*.go' -exec sed -i 's/go\.temporal\.io/github\.com\/temporalio\/cloud-samples-go\/protogen\/temporal/g' {} \;; fi

##### Plugins & tools #####
buf-install:
printf $(COLOR) "Install/update buf..."
go install github.com/bufbuild/buf/cmd/[email protected]

grpc-install:
printf $(COLOR) "Install/update gRPC plugins..."
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

bins: clean worker exporttool

##### Build #####
Expand Down
47 changes: 0 additions & 47 deletions client/api/apikey.go

This file was deleted.

87 changes: 34 additions & 53 deletions client/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,50 @@ package api

import (
"context"
"crypto/tls"
"fmt"
"net/url"
"strings"
"time"

grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

func NewConnectionWithAPIKey(addrStr string, allowInsecure bool, apiKey string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return NewConnection(
addrStr,
allowInsecure,
append(opts, grpc.WithPerRPCCredentials(NewAPIKeyRPCCredential(apiKey, allowInsecure)))...,
)
type Client struct {
client.CloudOperationsClient
}

func NewConnection(addrStr string, allowInsecure bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
addr, err := url.Parse(addrStr)
if err != nil {
return nil, fmt.Errorf("unable to parse server address: %s", err)
}
defaultOpts, err := defaultDialOptions(addr, allowInsecure)
if err != nil {
return nil, fmt.Errorf("failed to generate default dial options: %s", err)
}

conn, err := grpc.Dial(
addr.String(),
append(defaultOpts, opts...)...,
)
if err != nil {
return nil, fmt.Errorf("failed to dial `%s`: %v", addr.String(), err)
}
return conn, nil
}
var (
_ client.CloudOperationsClient = &Client{}

func defaultDialOptions(addr *url.URL, allowInsecure bool) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
TemporalCloudAPIAddress = "saas-api.tmprl.cloud:443"
TemporalCloudAPIVersion = "2024-10-01-00"
)

transport := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
ServerName: addr.Hostname(),
func NewConnectionWithAPIKey(addrStr string, allowInsecure bool, apiKey string) (*Client, error) {

var cClient client.CloudOperationsClient
var err error
cClient, err = client.DialCloudOperationsClient(context.Background(), client.CloudOperationsClientOptions{
Version: TemporalCloudAPIVersion,
Credentials: client.NewAPIKeyStaticCredentials(apiKey),
DisableTLS: allowInsecure,
HostPort: addrStr,
ConnectionOptions: client.ConnectionOptions{
DialOptions: []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
grpcretry.UnaryClientInterceptor(
grpcretry.WithBackoff(
grpcretry.BackoffExponentialWithJitter(250*time.Millisecond, 0.1),
),
grpcretry.WithMax(5),
),
),
},
},
})
if allowInsecure {
transport = insecure.NewCredentials()
if err != nil {
return nil, fmt.Errorf("failed to connect `%s`: %v", client.DefaultHostPort, err)
}

opts = append(opts, grpc.WithTransportCredentials(transport))
opts = append(opts, grpc.WithUnaryInterceptor(setAPIVersionInterceptor))
return opts, nil
}

func setAPIVersionInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, TemporalCloudAPIVersionHeader, strings.TrimSpace(TemporalCloudAPIVersion))
return invoker(ctx, method, req, reply, cc, opts...)
return &Client{cClient}, nil
}
1 change: 0 additions & 1 deletion client/api/version

This file was deleted.

12 changes: 0 additions & 12 deletions client/api/version.go

This file was deleted.

127 changes: 92 additions & 35 deletions client/temporal/client.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,134 @@
package temporal

import (
"context"
"crypto/tls"
"fmt"
"net"

"github.com/temporalio/cloud-samples-go/client/api"
"github.com/temporalio/cloud-samples-go/internal/validator"
cloudservicev1 "go.temporal.io/api/cloud/cloudservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
)

const (
localTemporalHostPort = "localhost:7233"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type (
ApiKeyAuth struct {
// The api key to use for the client
APIKey string
}

MtlsAuth struct {
// The temporal cloud namespace's grpc endpoint address
// defaults to '<namespace>.tmprl.cloud:7233'
GRPCEndpoint string
// The TLS cert and key file paths
// Read more about TLS in Temporal here: https://docs.temporal.io/cloud/Certificates
TLSCertFilePath string
TLSKeyFilePath string
}

GetTemporalCloudNamespaceClientInput struct {
// The temporal cloud namespace to connect to (required) for e.g. "prod.a2dd6"
Namespace string `required:"true"`
// The temporal cloud namespace's grpc endpoint address, defaults to '<namespace>.tmprl.cloud:7233'
GRPCEndpoint string

// The TLS cert and key file paths
// Read more about TLS in Temporal here: https://docs.temporal.io/cloud/certificates
TLSCertFilePath string `required:"true"`
TLSKeyFilePath string `required:"true"`
// The auth to use for the client, defaults to local
Auth AuthType

// The API key to use for the client, defaults to no API key.
APIKey string

// The logger to use for the client, defaults to no logging
Logger log.Logger
}

AuthType interface {
apply(options *client.Options) error
}
)

func GetTemporalCloudNamespaceClient(input *GetTemporalCloudNamespaceClientInput) (client.Client, error) {
func (a *ApiKeyAuth) apply(options *client.Options) error {

err := validator.ValidateStruct(input)
c, err := api.NewConnectionWithAPIKey(api.TemporalCloudAPIAddress, false, a.APIKey)
if err != nil {
return nil, err
return fmt.Errorf("failed to create cloud api connection: %w", err)
}
if input.GRPCEndpoint == "" {
input.GRPCEndpoint = fmt.Sprintf("%s.tmprl.cloud:7233", input.Namespace)
}
tlsConfig, err := getTLSConfig(input)
resp, err := c.CloudService().GetNamespace(context.Background(), &cloudservicev1.GetNamespaceRequest{
Namespace: options.Namespace,
})
if err != nil {
return nil, fmt.Errorf("failed to get TLS config: %w", err)
return fmt.Errorf("failed to get namespace %s: %w", options.Namespace, err)
}
opts := client.Options{
HostPort: input.GRPCEndpoint,
Namespace: input.Namespace,
ConnectionOptions: client.ConnectionOptions{TLS: tlsConfig},
Logger: input.Logger,
if resp.GetNamespace().GetEndpoints().GetGrpcAddress() == "" {
return fmt.Errorf("namespace %q has no grpc address", options.Namespace)
}
if input.Logger != nil {
opts.Logger = input.Logger
options.HostPort = resp.GetNamespace().GetEndpoints().GetGrpcAddress()
options.Credentials = client.NewAPIKeyStaticCredentials(a.APIKey)
options.ConnectionOptions = client.ConnectionOptions{
TLS: &tls.Config{},
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req any, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(
metadata.AppendToOutgoingContext(ctx, "temporal-namespace", options.Namespace),
method,
req,
reply,
cc,
opts...,
)
},
),
},
}
return client.Dial(opts)
return nil
}

func getTLSConfig(input *GetTemporalCloudNamespaceClientInput) (*tls.Config, error) {
if input.TLSCertFilePath == "" || input.TLSKeyFilePath == "" {
return nil, nil
func (a *MtlsAuth) apply(options *client.Options) error {
endpoint := a.GRPCEndpoint
if endpoint == "" {
endpoint = fmt.Sprintf("%s.tmprl.cloud:7233", options.Namespace)
}
if a.TLSCertFilePath == "" || a.TLSKeyFilePath == "" {
return fmt.Errorf("both tls cert and key file paths are required")
}
serverName, _, parseErr := net.SplitHostPort(input.GRPCEndpoint)
serverName, _, parseErr := net.SplitHostPort(endpoint)
if parseErr != nil {
return nil, fmt.Errorf("failed to split hostport %s: %w", input.GRPCEndpoint, parseErr)
return fmt.Errorf("failed to split hostport %s: %w", endpoint, parseErr)
}
var cert tls.Certificate
var err error
cert, err = tls.LoadX509KeyPair(input.TLSCertFilePath, input.TLSKeyFilePath)
cert, err = tls.LoadX509KeyPair(a.TLSCertFilePath, a.TLSKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS from files: %w", err)
return fmt.Errorf("failed to load TLS from files: %w", err)
}
return &tls.Config{
options.HostPort = endpoint
options.ConnectionOptions = client.ConnectionOptions{TLS: &tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: serverName,
}, nil
}}
return nil
}

func GetTemporalCloudNamespaceClient(ctx context.Context, input *GetTemporalCloudNamespaceClientInput) (client.Client, error) {
err := validator.ValidateStruct(input)
if err != nil {
return nil, err
}

opts := client.Options{
Namespace: input.Namespace,
Logger: input.Logger,
}
err = input.Auth.apply(&opts)
if err != nil {
return nil, err
}
if input.Logger != nil {
opts.Logger = input.Logger
}
return client.Dial(opts)
}
Loading

0 comments on commit 065b08c

Please sign in to comment.