diff --git a/caraml-store-sdk/go/serving/auth.go b/caraml-store-sdk/go/auth/auth.go similarity index 99% rename from caraml-store-sdk/go/serving/auth.go rename to caraml-store-sdk/go/auth/auth.go index de2f2bd..4e0646a 100644 --- a/caraml-store-sdk/go/serving/auth.go +++ b/caraml-store-sdk/go/auth/auth.go @@ -1,4 +1,4 @@ -package serving +package auth import ( "bytes" diff --git a/caraml-store-sdk/go/serving/auth_test.go b/caraml-store-sdk/go/auth/auth_test.go similarity index 99% rename from caraml-store-sdk/go/serving/auth_test.go rename to caraml-store-sdk/go/auth/auth_test.go index 0fec20f..e0bf919 100644 --- a/caraml-store-sdk/go/serving/auth_test.go +++ b/caraml-store-sdk/go/auth/auth_test.go @@ -1,4 +1,4 @@ -package serving +package auth import ( "context" diff --git a/caraml-store-sdk/go/protos/feast/core/LegacyJobService.pb.go b/caraml-store-sdk/go/protos/feast/core/LegacyJobService.pb.go deleted file mode 100644 index 1e4c601..0000000 --- a/caraml-store-sdk/go/protos/feast/core/LegacyJobService.pb.go +++ /dev/null @@ -1,112 +0,0 @@ -// Legacy compatibility endpoint for older version of feast client. -// The new endpoint is defined in feast_spark/api/JobService.proto - -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.30.0 -// protoc (unknown) -// source: feast/core/LegacyJobService.proto - -package core - -import ( - api "github.com/caraml-dev/caraml-store/caraml-store-sdk/go/protos/feast_spark/api" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -var File_feast_core_LegacyJobService_proto protoreflect.FileDescriptor - -var file_feast_core_LegacyJobService_proto_rawDesc = []byte{ - 0x0a, 0x21, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x4c, 0x65, 0x67, - 0x61, 0x63, 0x79, 0x4a, 0x6f, 0x62, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x1a, - 0x20, 0x66, 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x2f, 0x61, 0x70, 0x69, - 0x2f, 0x4a, 0x6f, 0x62, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x32, 0xe9, 0x02, 0x0a, 0x0a, 0x4a, 0x6f, 0x62, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x97, 0x01, 0x0a, 0x20, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, 0x6c, 0x69, 0x6e, - 0x65, 0x54, 0x6f, 0x4f, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, - 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x38, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, - 0x61, 0x72, 0x6b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, - 0x6c, 0x69, 0x6e, 0x65, 0x54, 0x6f, 0x4f, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x39, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x2e, 0x61, 0x70, - 0x69, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, 0x6c, 0x69, 0x6e, 0x65, 0x54, 0x6f, - 0x4f, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4a, - 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x76, 0x0a, 0x15, 0x47, 0x65, - 0x74, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x69, 0x63, 0x61, 0x6c, 0x46, 0x65, 0x61, 0x74, 0x75, - 0x72, 0x65, 0x73, 0x12, 0x2d, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x61, 0x72, - 0x6b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x69, - 0x63, 0x61, 0x6c, 0x46, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x61, 0x72, 0x6b, - 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x69, 0x63, - 0x61, 0x6c, 0x46, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x49, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x4a, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x66, - 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, - 0x65, 0x74, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, - 0x65, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, - 0x65, 0x74, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x7d, 0x0a, - 0x20, 0x64, 0x65, 0x76, 0x2e, 0x63, 0x61, 0x72, 0x61, 0x6d, 0x6c, 0x2e, 0x73, 0x74, 0x6f, 0x72, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x61, - 0x74, 0x42, 0x0f, 0x4a, 0x6f, 0x62, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x5a, 0x48, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, - 0x61, 0x72, 0x61, 0x6d, 0x6c, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x63, 0x61, 0x72, 0x61, 0x6d, 0x6c, - 0x2d, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x72, 0x61, 0x6d, 0x6c, 0x2d, 0x73, 0x74, - 0x6f, 0x72, 0x65, 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x73, 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, -} - -var file_feast_core_LegacyJobService_proto_goTypes = []interface{}{ - (*api.StartOfflineToOnlineIngestionJobRequest)(nil), // 0: feast_spark.api.StartOfflineToOnlineIngestionJobRequest - (*api.GetHistoricalFeaturesRequest)(nil), // 1: feast_spark.api.GetHistoricalFeaturesRequest - (*api.GetJobRequest)(nil), // 2: feast_spark.api.GetJobRequest - (*api.StartOfflineToOnlineIngestionJobResponse)(nil), // 3: feast_spark.api.StartOfflineToOnlineIngestionJobResponse - (*api.GetHistoricalFeaturesResponse)(nil), // 4: feast_spark.api.GetHistoricalFeaturesResponse - (*api.GetJobResponse)(nil), // 5: feast_spark.api.GetJobResponse -} -var file_feast_core_LegacyJobService_proto_depIdxs = []int32{ - 0, // 0: feast.core.JobService.StartOfflineToOnlineIngestionJob:input_type -> feast_spark.api.StartOfflineToOnlineIngestionJobRequest - 1, // 1: feast.core.JobService.GetHistoricalFeatures:input_type -> feast_spark.api.GetHistoricalFeaturesRequest - 2, // 2: feast.core.JobService.GetJob:input_type -> feast_spark.api.GetJobRequest - 3, // 3: feast.core.JobService.StartOfflineToOnlineIngestionJob:output_type -> feast_spark.api.StartOfflineToOnlineIngestionJobResponse - 4, // 4: feast.core.JobService.GetHistoricalFeatures:output_type -> feast_spark.api.GetHistoricalFeaturesResponse - 5, // 5: feast.core.JobService.GetJob:output_type -> feast_spark.api.GetJobResponse - 3, // [3:6] is the sub-list for method output_type - 0, // [0:3] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_feast_core_LegacyJobService_proto_init() } -func file_feast_core_LegacyJobService_proto_init() { - if File_feast_core_LegacyJobService_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_feast_core_LegacyJobService_proto_rawDesc, - NumEnums: 0, - NumMessages: 0, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_feast_core_LegacyJobService_proto_goTypes, - DependencyIndexes: file_feast_core_LegacyJobService_proto_depIdxs, - }.Build() - File_feast_core_LegacyJobService_proto = out.File - file_feast_core_LegacyJobService_proto_rawDesc = nil - file_feast_core_LegacyJobService_proto_goTypes = nil - file_feast_core_LegacyJobService_proto_depIdxs = nil -} diff --git a/caraml-store-sdk/go/registry/client.go b/caraml-store-sdk/go/registry/client.go new file mode 100644 index 0000000..bc1fc40 --- /dev/null +++ b/caraml-store-sdk/go/registry/client.go @@ -0,0 +1,127 @@ +package serving + +import ( + "context" + "crypto/x509" + "fmt" + "github.com/caraml-dev/caraml-store/caraml-store-sdk/go/auth" + registryproto "github.com/caraml-dev/caraml-store/caraml-store-sdk/go/protos/feast_spark/api" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + + "github.com/opentracing/opentracing-go" + "go.opencensus.io/plugin/ocgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// GrpcClient is a grpc client for feast serving. +type GrpcClient struct { + cli registryproto.JobServiceClient + conn *grpc.ClientConn +} + +// SecurityConfig wraps security config for GrpcClient +type SecurityConfig struct { + // Whether to enable TLS SSL trasnport security if true. + EnableTLS bool + // Optional: Provides path to TLS certificate used the verify Service identity. + TLSCertPath string + // Optional: Credential used for authentication. + // Disables authentication if unspecified. + Credential *auth.Credential +} + +// NewGrpcClient constructs a client that can interact via grpc with the feast serving instance at the given host:port. +func NewGrpcClient(host string, port int) (*GrpcClient, error) { + return NewSecureGrpcClient(host, port, SecurityConfig{ + EnableTLS: false, + Credential: nil, + }) +} + +// NewSecureGrpcClient constructs a secure client that uses security features (ie authentication). +// host - hostname of the serving host/instance to connect to. +// port - post of the host to service host/instancf to connect to. +// securityConfig - security config configures client security. +func NewSecureGrpcClient(host string, port int, security SecurityConfig) (*GrpcClient, error) { + return NewSecureGrpcClientWithDialOptions(host, port, security) +} + +// NewSecureGrpcClientWithDialOptions constructs a secure client that uses security features (ie authentication) along with custom grpc dial options. +// host - hostname of the serving host/instance to connect to. +// port - post of the host to service host/instancf to connect to. +// securityConfig - security config configures client security. +// opts - grpc.DialOptions which should be used with this connection +func NewSecureGrpcClientWithDialOptions(host string, port int, security SecurityConfig, opts ...grpc.DialOption) (*GrpcClient, error) { + feastCli := &GrpcClient{} + adr := fmt.Sprintf("%s:%d", host, port) + + // Compile grpc dial options from security config. + options := append(opts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{})) + // Configure client TLS. + if !security.EnableTLS { + options = append(options, grpc.WithInsecure()) + } else if security.EnableTLS && security.TLSCertPath != "" { + // Read TLS certificate from given path. + tlsCreds, err := credentials.NewClientTLSFromFile(security.TLSCertPath, "") + if err != nil { + return nil, err + } + options = append(options, grpc.WithTransportCredentials(tlsCreds)) + } else { + // Use system TLS certificate pool. + certPool, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + tlsCreds := credentials.NewClientTLSFromCert(certPool, "") + options = append(options, grpc.WithTransportCredentials(tlsCreds)) + } + + // Enable authentication by attaching credentials if given + if security.Credential != nil { + options = append(options, grpc.WithPerRPCCredentials(security.Credential)) + } + + // Enable tracing if a global tracer is registered + tracingInterceptor := grpc.WithUnaryInterceptor( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) + options = append(options, tracingInterceptor) + + conn, err := grpc.Dial(adr, options...) + if err != nil { + return nil, err + } + feastCli.cli = registryproto.NewJobServiceClient(conn) + feastCli.conn = conn + return feastCli, nil +} + +// ListJobs lists the spark jobs created by the registry service. +func (fc *GrpcClient) ListJobs(ctx context.Context, req *registryproto.ListJobsRequest) ([]*registryproto.Job, error) { + resp, err := fc.cli.ListJobs(ctx, req) + return resp.Jobs, err +} + +// ListScheduledJobs lists the scheduled spark jobs created by the registry service. +func (fc *GrpcClient) ListScheduledJobs(ctx context.Context, req *registryproto.ListScheduledJobsRequest) ([]*registryproto.ScheduledJob, error) { + resp, err := fc.cli.ListScheduledJobs(ctx, req) + return resp.Jobs, err +} + +// StartStreamingJob start or update a streaming ingestion job. +func (fc *GrpcClient) StartStreamingJob(ctx context.Context, req *registryproto.StartStreamIngestionJobRequest) error { + _, err := fc.cli.StartStreamIngestionJob(ctx, req) + return err +} + +// ScheduleOfflineToOnlineIngestionJob schedule or update a batch ingestion job. +func (fc *GrpcClient) ScheduleOfflineToOnlineIngestionJob(ctx context.Context, req *registryproto.ScheduleOfflineToOnlineIngestionJobRequest) error { + _, err := fc.cli.ScheduleOfflineToOnlineIngestionJob(ctx, req) + return err +} + +// Close the grpc connection. +func (fc *GrpcClient) Close() error { + return fc.conn.Close() +} diff --git a/caraml-store-sdk/go/serving/client.go b/caraml-store-sdk/go/serving/client.go index f877f72..847038d 100644 --- a/caraml-store-sdk/go/serving/client.go +++ b/caraml-store-sdk/go/serving/client.go @@ -4,6 +4,7 @@ import ( "context" "crypto/x509" "fmt" + "github.com/caraml-dev/caraml-store/caraml-store-sdk/go/auth" servingproto "github.com/caraml-dev/caraml-store/caraml-store-sdk/go/protos/feast/serving" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" @@ -34,7 +35,7 @@ type SecurityConfig struct { TLSCertPath string // Optional: Credential used for authentication. // Disables authentication if unspecified. - Credential *Credential + Credential *auth.Credential } // NewGrpcClient constructs a client that can interact via grpc with the feast serving instance at the given host:port.