Skip to content

Commit

Permalink
Merge pull request #150 from MUzairS15/MUzairS15/model/registration
Browse files Browse the repository at this point in the history
[models] Add adapter model registration funcs.
  • Loading branch information
Mohd Uzair authored Jan 29, 2024
2 parents 86da140 + f5f885a commit a26e6e2
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 235 deletions.
4 changes: 4 additions & 0 deletions adapter/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,7 @@ func ErrGenerateComponents(err error) error {
func ErrCreatingComponents(err error) error {
return errors.New(ErrCreatingComponentsCode, errors.Alert, []string{"error creating components"}, []string{err.Error()}, []string{"Invalid Path or version passed in static configuration", "URL passed maybe incorrect", "Version passed maybe incorrect"}, []string{"Make sure to pass correct configuration", "Make sure the URL passed in the configuration is correct", "Make sure a valid version is passed in configuration"})
}

func ErrRegisterComponents(err error) error {
return errors.New(ErrCreatingComponentsCode, errors.Alert, []string{"error registering components"}, []string{err.Error()}, []string{"Invalid Path or version passed in configuration", "Server URL passed maybe incorrect", "Server is not reachable"}, []string{"Make sure to pass correct configuration", "Make sure the URL passed in the configuration is correct", "Make sure adapter is reachable to the server"})
}
2 changes: 1 addition & 1 deletion adapter/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest)

// ProcessOAM wraps the Handler's ProcessOAM method along with relevant logging
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest) (string, error) {
s.log.Info("Process OAM components")
s.log.Info("Process model components")
msg, err := s.next.ProcessOAM(ctx, oamRequest)
if err != nil {
s.log.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion adapter/meshmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type MeshModelRegistrant struct {
HTTPRegistry string
}

// NewOAMRegistrant returns an instance of OAMRegistrant
// NewMeshModelRegistrant returns an instance of NewMeshModelRegistrant
func NewMeshModelRegistrant(paths []MeshModelRegistrantDefinitionPath, HTTPRegistry string) *MeshModelRegistrant {
return &MeshModelRegistrant{
Paths: paths,
Expand Down
237 changes: 4 additions & 233 deletions adapter/oam.go
Original file line number Diff line number Diff line change
@@ -1,172 +1,27 @@
package adapter

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"

backoff "github.com/cenkalti/backoff/v4"
meshmodel "github.com/layer5io/meshkit/models/meshmodel/core/v1alpha1"
"github.com/layer5io/meshkit/models/oam/core/v1alpha1"
"github.com/layer5io/meshkit/utils"
"github.com/layer5io/meshkit/utils/manifests"
)

const (
// OAM Metadata constants
OAMAdapterNameMetadataKey = "adapter.meshery.io/name"
OAMComponentCategoryMetadataKey = "ui.meshery.io/category"

//Runtime generation methods
var (
Manifests = "MANIFESTS"
HelmCHARTS = "HELM_CHARTS"
HelmCharts = "HELM_CHARTS"
)

// ProcessOAM processes OAM components. This is adapter specific and needs to be implemented by each adapter.
func (h *Adapter) ProcessOAM(context.Context, OAMRequest) (string, error) {
return "", nil
}

// OAMRegistrant provides utility functions for registering
// OAM components to a registry in a reliable way
type OAMRegistrant struct {
// Paths is a slice for holding the paths of OAMDefitions,
// OAMRefSchema and Host on the filesystem
//
// OAMRegistrant will read the definitions from these
// paths and will register them to the OAM registry
Paths []OAMRegistrantDefinitionPath

// OAMHTTPRegistry is the address of an OAM registry
OAMHTTPRegistry string
}

// OAMRegistrantDefinitionPath - Structure for configuring registrant paths
type OAMRegistrantDefinitionPath struct {
// OAMDefinitionPath holds the path for OAM Definition file
OAMDefintionPath string
// OAMRefSchemaPath holds the path for the OAM Ref Schema file
OAMRefSchemaPath string
// Host is the address of the gRPC host capable of processing the request
Host string
// Restricted should be set to true if this capability should be restricted
// only to the server and shouldn't be exposed to the user for direct usage
Restricted bool
// Metadata is the other data which can be attached to the post request body
//
// Metadata like name of the component, etc.
Metadata map[string]string
}

// OAMRegistrantData struct defines the body of the POST request that is sent to the OAM
// registry (Meshery)
//
// The body contains the
// 1. OAM definition, which is in accordance with the OAM spec
// 2. OAMRefSchema, which is json schema draft-4, draft-7 or draft-8 for the corresponding OAM object
// 3. Host is this service's grpc address in the form of `hostname:port`
// 4. Restricted should be set to true if the given capability is meant to be used internally
// 5. Metadata can be a map of key value pairs
type OAMRegistrantData struct {
OAMDefinition interface{} `json:"oam_definition,omitempty"`
OAMRefSchema string `json:"oam_ref_schema,omitempty"`
Host string `json:"host,omitempty"`
Restricted bool `json:"restricted,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}

// NewOAMRegistrant returns an instance of OAMRegistrant
func NewOAMRegistrant(paths []OAMRegistrantDefinitionPath, oamHTTPRegistry string) *OAMRegistrant {
return &OAMRegistrant{
Paths: paths,
OAMHTTPRegistry: oamHTTPRegistry,
}
}

// Register will register each capability individually to the OAM Capability registry
//
// It sends a POST request to the endpoint in the "OAMHTTPRegistry", if the request
// fails then the request is retried. It uses exponential backoff algorithm to determine
// the interval between in the retries. It will retry only for 10 mins and will stop retrying
// after that.
//
// Register function is a blocking function
func (or *OAMRegistrant) Register() error {
for _, dpath := range or.Paths {
var ord OAMRegistrantData

definition, err := os.Open(dpath.OAMDefintionPath)
if err != nil {
return ErrOpenOAMDefintionFile(err)
}

definitionMap := map[string]interface{}{}
if err = json.NewDecoder(definition).Decode(&definitionMap); err != nil {
_ = definition.Close()
return ErrJSONMarshal(err)
}
_ = definition.Close()
ord.OAMDefinition = definitionMap

schema, err := os.ReadFile(dpath.OAMRefSchemaPath)
if err != nil {
return ErrOpenOAMRefFile(err)
}
if string(schema) == "" { //since this component is unusable if it doesn't have oam_ref_schema
continue
}
formatTitleInOAMRefSchema(&schema)

ord.OAMRefSchema = string(schema)

ord.Host = dpath.Host
ord.Metadata = dpath.Metadata
ord.Restricted = dpath.Restricted

// send request to the register
backoffOpt := backoff.NewExponentialBackOff()
backoffOpt.MaxElapsedTime = 10 * time.Minute
if err := backoff.Retry(func() error {
contentByt, err := json.Marshal(ord)
if err != nil {
return backoff.Permanent(err)
}
content := bytes.NewReader(contentByt)

// host here is given by the application itself and is trustworthy hence,
// #nosec
resp, err := http.Post(or.OAMHTTPRegistry, "application/json", content)
if err != nil {
return err
}

if resp.StatusCode != http.StatusCreated &&
resp.StatusCode != http.StatusOK &&
resp.StatusCode != http.StatusAccepted {
return fmt.Errorf(
"register process failed, host returned status: %s with status code %d",
resp.Status,
resp.StatusCode,
)
}

return nil
}, backoffOpt); err != nil {
return ErrOAMRetry(err)
}
}

return nil
}

type MeshModelConfig struct {
Category string
CategoryMetadata map[string]interface{}
Expand Down Expand Up @@ -200,7 +55,7 @@ func CreateComponents(scfg StaticCompConfig) error {
switch scfg.Method {
case Manifests:
comp, err = manifests.GetFromManifest(context.Background(), scfg.URL, manifests.SERVICE_MESH, scfg.Config)
case HelmCHARTS:
case HelmCharts:
comp, err = manifests.GetFromHelm(context.Background(), scfg.URL, manifests.SERVICE_MESH, scfg.Config)
default:
return ErrCreatingComponents(errors.New("invalid generation method. Must be either Manifests or HelmCharts"))
Expand Down Expand Up @@ -401,88 +256,4 @@ type DynamicComponentsConfig struct {
GenerationMethod string
Config manifests.Config
Operation string
}

func RegisterWorkLoadsDynamically(runtime, host string, dc *DynamicComponentsConfig) error {
var comp *manifests.Component
var err error
switch dc.GenerationMethod {
case Manifests:
comp, err = manifests.GetFromManifest(context.Background(), dc.URL, manifests.SERVICE_MESH, dc.Config)
case HelmCHARTS:
comp, err = manifests.GetFromHelm(context.Background(), dc.URL, manifests.SERVICE_MESH, dc.Config)
default:
return ErrGenerateComponents(errors.New("failed to generate components"))
}
if err != nil {
return ErrGenerateComponents(err)
}
if comp == nil {
return ErrGenerateComponents(errors.New("failed to generate components"))
}
for i, def := range comp.Definitions {
var ord OAMRegistrantData
ord.OAMRefSchema = comp.Schemas[i]

//Marshalling the stringified json
ord.Host = host
definitionMap := map[string]interface{}{}
if err := json.Unmarshal([]byte(def), &definitionMap); err != nil {
return err
}
definitionMap["apiVersion"] = "core.oam.dev/v1alpha1"
definitionMap["kind"] = "WorkloadDefinition"
ord.OAMDefinition = definitionMap
ord.Metadata = map[string]string{
OAMAdapterNameMetadataKey: dc.Operation,
}
// send request to the register
backoffOpt := backoff.NewExponentialBackOff()
backoffOpt.MaxElapsedTime = time.Minute * dc.TimeoutInMinutes
if err := backoff.Retry(func() error {
contentByt, err := json.Marshal(ord)
if err != nil {
return backoff.Permanent(err)
}
content := bytes.NewReader(contentByt)
// host here is given by the application itself and is trustworthy hence,
// #nosec
resp, err := http.Post(fmt.Sprintf("%s/api/oam/workload", runtime), "application/json", content)
if err != nil {
return err
}
if resp.StatusCode != http.StatusCreated &&
resp.StatusCode != http.StatusOK &&
resp.StatusCode != http.StatusAccepted {
return fmt.Errorf(
"register process failed, host returned status: %s with status code %d",
resp.Status,
resp.StatusCode,
)
}

return nil
}, backoffOpt); err != nil {
return err
}
}
return nil
}

func formatTitleInOAMRefSchema(schema *[]byte) {
var schemamap map[string]interface{}
err := json.Unmarshal(*schema, &schemamap)
if err != nil {
return
}
title, ok := schemamap["title"].(string)
if !ok {
return
}

schemamap["title"] = manifests.FormatToReadableString(title)
(*schema), err = json.Marshal(schemamap)
if err != nil {
return
}
}
}
70 changes: 70 additions & 0 deletions adapter/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package adapter

import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"

"github.com/layer5io/meshkit/models/meshmodel/core/types"
)

var (
basePath, _ = os.Getwd()
MeshmodelComponents = filepath.Join(basePath, "templates", "meshmodel", "components")
)

// AvailableVersions denote the component versions available statically
var AvailableVersions = map[string]bool{}

type meshmodelDefinitionPathSet struct {
meshmodelDefinitionPath string
}

func RegisterMeshModelComponents(uuid, runtime, host, port string) error {
meshmodelRDP := []MeshModelRegistrantDefinitionPath{}
pathSets, err := loadMeshmodelComponents(MeshmodelComponents)
if err != nil {
return ErrRegisterComponents(err)
}
portint, _ := strconv.Atoi(port)
for _, pathSet := range pathSets {
meshmodelRDP = append(meshmodelRDP, MeshModelRegistrantDefinitionPath{
EntityDefintionPath: pathSet.meshmodelDefinitionPath,
Host: host,
Port: portint,
Type: types.ComponentDefinition,
})
}

return NewMeshModelRegistrant(meshmodelRDP, fmt.Sprintf("%s/api/meshmodel/components/register", runtime)).
Register(uuid)
}

var versionLock sync.Mutex

func loadMeshmodelComponents(basepath string) ([]meshmodelDefinitionPathSet, error) {
res := []meshmodelDefinitionPathSet{}
if err := filepath.Walk(basepath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if info.IsDir() {
return nil
}

res = append(res, meshmodelDefinitionPathSet{
meshmodelDefinitionPath: path,
})
versionLock.Lock()
AvailableVersions[filepath.Base(filepath.Dir(path))] = true // Getting available versions already existing on file system
versionLock.Unlock()
return nil
}); err != nil {
return nil, err
}

return res, nil
}

0 comments on commit a26e6e2

Please sign in to comment.