Skip to content

Commit

Permalink
PWX-35559: add correlation tracing to nsm filter Stats (#2402)
Browse files Browse the repository at this point in the history
  • Loading branch information
alicelyy authored Jan 22, 2024
1 parent d3226fa commit 0428d70
Show file tree
Hide file tree
Showing 31 changed files with 579 additions and 553 deletions.
15 changes: 5 additions & 10 deletions api/client/volume/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ func (v *volumeClient) Status() [][2]string {

// Inspect specified volumes.
// Errors ErrEnoEnt may be returned.
func (v *volumeClient) Inspect(ids []string) ([]*api.Volume, error) {
if len(ids) == 0 {
func (v *volumeClient) Inspect(ctx context.Context, volumeIDs []string) ([]*api.Volume, error) {
if len(volumeIDs) == 0 {
return nil, nil
}
var volumes []*api.Volume
request := v.c.Get().Resource(volumePath)
for _, id := range ids {
for _, id := range volumeIDs {
request.QueryOption(api.OptVolumeID, id)
}
if err := request.Do().Unmarshal(&volumes); err != nil {
Expand Down Expand Up @@ -246,10 +246,7 @@ func (v *volumeClient) Restore(volumeID string, snapID string) error {

// Stats for specified volume.
// Errors ErrEnoEnt may be returned
func (v *volumeClient) Stats(
volumeID string,
cumulative bool,
) (*api.Stats, error) {
func (v *volumeClient) Stats(ctx context.Context, volumeID string, cumulative bool) (*api.Stats, error) {
stats := &api.Stats{}
req := v.c.Get().Resource(volumePath + "/stats").Instance(volumeID)
req.QueryOption(api.OptCumulative, strconv.FormatBool(cumulative))
Expand Down Expand Up @@ -306,9 +303,7 @@ func (v *volumeClient) CapacityUsage(
return requests, nil
}

func (v *volumeClient) VolumeUsageByNode(
nodeID string,
) (*api.VolumeUsageByNode, error) {
func (v *volumeClient) VolumeUsageByNode(ctx context.Context, nodeID string) (*api.VolumeUsageByNode, error) {

return nil, volume.ErrNotSupported

Expand Down
3 changes: 2 additions & 1 deletion api/client/volume/client_volume_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package volume

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
Expand All @@ -25,7 +26,7 @@ func TestClientTLS(t *testing.T) {

clnt.SetTLS(&tls.Config{InsecureSkipVerify: true})

_, err = VolumeDriver(clnt).Inspect([]string{"12345"})
_, err = VolumeDriver(clnt).Inspect(context.TODO(), []string{"12345"})

require.NoError(t, err)
}
24 changes: 12 additions & 12 deletions api/server/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ func (d *driver) errorResponse(method string, w http.ResponseWriter, err error)
}
}

func (d *driver) volFromName(name string) (*api.Volume, error) {
func (d *driver) volFromName(ctx context.Context, name string) (*api.Volume, error) {
v, err := volumedrivers.Get(d.name)
if err != nil {
return nil, fmt.Errorf("Cannot locate volume driver for %s: %s", d.name, err.Error())
return nil, fmt.Errorf("cannot locate volume driver for %s: %s", d.name, err.Error())
}
return util.VolumeFromName(v, name)
return util.VolumeFromName(ctx, v, name)
}

func (d *driver) volFromNameOrIDSdk(ctx context.Context, volumes api.OpenStorageVolumeClient, name string) (*api.Volume, error) {
Expand Down Expand Up @@ -254,10 +254,10 @@ func (d *driver) attachTokenMount(ctx context.Context, request *mountRequest) (c

// parseTokenInput reads token input from the given name and opts.
// The following is the order of precedence for token in types:
// 1. token=<token> in name
// 2. token in opts
// 3. token_secret=<secret> in name
// 4. token_secret in opts
// 1. token=<token> in name
// 2. token in opts
// 3. token_secret=<secret> in name
// 4. token_secret in opts
func (d *driver) parseTokenInput(name string, opts map[string]string) (string, error) {
// get token from name
tokenFromName, tokenInName := d.GetTokenFromString(name)
Expand Down Expand Up @@ -484,7 +484,7 @@ func (d *driver) scaleUp(
return nil, err
}
id := resp.GetVolumeId()
if outVol, err = d.volFromName(id); err != nil {
if outVol, err = d.volFromName(ctx, id); err != nil {
return nil, err
}
_, err = mountClient.Attach(ctx, &api.SdkVolumeAttachRequest{
Expand Down Expand Up @@ -571,7 +571,7 @@ func (d *driver) attachScale(
return d.scaleUp(ctx, conn, method, vd, inSpec, inVol, allVols, attachOptions)
}
id := resp.GetVolumeId()
outVol, err := d.volFromName(id)
outVol, err := d.volFromName(ctx, id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -791,7 +791,7 @@ func (d *driver) path(w http.ResponseWriter, r *http.Request) {
}

_, _, _, _, name := d.SpecFromString(request.Name)
vol, err := d.volFromName(name)
vol, err := d.volFromName(r.Context(), name)
if err != nil {
e := d.volNotFound(method, request.Name, err, w)
d.errorResponse(method, w, e)
Expand Down Expand Up @@ -824,7 +824,7 @@ func (d *driver) get(w http.ResponseWriter, r *http.Request) {
} else {
returnName = name
}
vol, err := d.volFromName(name)
vol, err := d.volFromName(correlation.TODO(), name)
if err != nil {
e := d.volNotFound(method, request.Name, err, w)
d.errorResponse(method, w, e)
Expand Down Expand Up @@ -859,7 +859,7 @@ func (d *driver) unmount(w http.ResponseWriter, r *http.Request) {

_, _, _, _, name := d.SpecFromString(request.Name)
nameWithID := name + request.ID
vol, err := d.volFromName(name)
vol, err := d.volFromName(ctx, name)
if err != nil {
e := d.volNotFound(method, name, err, w)
d.errorResponse(method, w, e)
Expand Down
8 changes: 4 additions & 4 deletions api/server/middleware_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (a *authMiddleware) setWithAuth(w http.ResponseWriter, r *http.Request, nex
if err != nil {
processErrorForVolSetResponse(req.Action, err, &resp)
} else {
v, err := d.Inspect([]string{volumeID})
v, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil {
processErrorForVolSetResponse(req.Action, err, &resp)
} else if v == nil || len(v) != 1 {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (a *authMiddleware) deleteWithAuth(w http.ResponseWriter, r *http.Request,
return
}

vols, err := d.Inspect([]string{volumeID})
vols, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil || len(vols) == 0 || vols[0] == nil {
json.NewEncoder(w).Encode(volumeResponse)
return
Expand Down Expand Up @@ -338,7 +338,7 @@ func (a *authMiddleware) inspectWithAuth(w http.ResponseWriter, r *http.Request,
return
}

dk, err := d.Inspect([]string{volumeID})
dk, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil {
a.log(volumeID, fn).WithError(err).Error("Failed to inspect volume")
http.Error(w, err.Error(), http.StatusNotFound)
Expand Down Expand Up @@ -368,7 +368,7 @@ func (a *authMiddleware) enumerateWithAuth(w http.ResponseWriter, r *http.Reques
}
volumeID := volIDs[0]

vols, err := d.Inspect([]string{volumeID})
vols, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil || len(vols) == 0 || vols[0] == nil {
a.log(volumeID, fn).WithError(err).Error("Failed to get volume object")
json.NewEncoder(w).Encode(emptyVols)
Expand Down
2 changes: 1 addition & 1 deletion api/server/sdk/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *NodeServer) VolumeUsageByNode(
if s.server.driver(ctx) == nil {
return nil, status.Error(codes.Unavailable, "Resource has not been initialized")
}
resp, err := s.server.driver(ctx).VolumeUsageByNode(req.GetNodeId())
resp, err := s.server.driver(ctx).VolumeUsageByNode(ctx, req.GetNodeId())
if err != nil {
return nil, status.Errorf(codes.Internal, " Failed to get VolumeUsageByNode :%v", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion api/server/sdk/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/golang/mock/gomock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -460,7 +461,7 @@ func TestSdkVolumeUsageByNode(t *testing.T) {

s.MockCluster().EXPECT().Enumerate().Return(cluster, nil).Times(1)
s.MockCluster().EXPECT().Inspect(nodeid).Return(node, nil).Times(2)
s.MockDriver().EXPECT().VolumeUsageByNode(nodeid).Return(&volumeUsageInfo, nil).Times(1)
s.MockDriver().EXPECT().VolumeUsageByNode(gomock.Any(), nodeid).Return(&volumeUsageInfo, nil).Times(1)

// Setup client
c := api.NewOpenStorageNodeClient(s.Conn())
Expand Down
2 changes: 1 addition & 1 deletion api/server/sdk/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestAuthorizationServerInterceptorCreateVolume(t *testing.T) {
gomock.InOrder(
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Inspect(gomock.Any(), []string{name}).
Return(nil, fmt.Errorf("not found")).
AnyTimes(),
s.MockDriver().
Expand Down
13 changes: 7 additions & 6 deletions api/server/sdk/volume_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/pkg/auth"
"github.com/libopenstorage/openstorage/pkg/correlation"
policy "github.com/libopenstorage/openstorage/pkg/storagepolicy"
"github.com/libopenstorage/openstorage/pkg/util"
"github.com/libopenstorage/openstorage/volume"
Expand All @@ -53,7 +54,7 @@ func (s *VolumeServer) waitForVolumeReady(ctx context.Context, id string) (*api.
func() (bool, error) {
var err error
// Get the latest status from the volume
v, err = util.VolumeFromName(s.driver(ctx), id)
v, err = util.VolumeFromName(correlation.TODO(), s.driver(ctx), id)
if err != nil {
return false, status.Errorf(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (s *VolumeServer) waitForVolumeRemoved(ctx context.Context, id string) erro
250*time.Millisecond, // period
func() (bool, error) {
// Get the latest status from the volume
if _, err := util.VolumeFromName(s.driver(ctx), id); err != nil {
if _, err := util.VolumeFromName(correlation.TODO(), s.driver(ctx), id); err != nil {
// Removed
return false, nil
}
Expand All @@ -108,7 +109,7 @@ func (s *VolumeServer) create(

// Check if the volume has already been created or is in process of creation
volName := locator.GetName()
v, err := util.VolumeFromName(s.driver(ctx), volName)
v, err := util.VolumeFromName(ctx, s.driver(ctx), volName)
// If the volume is still there but it is being delete, then wait until it is removed
if err == nil && v.GetState() == api.VolumeState_VOLUME_STATE_DELETED {
if err = s.waitForVolumeRemoved(ctx, volName); err != nil {
Expand Down Expand Up @@ -155,7 +156,7 @@ func (s *VolumeServer) create(
var id string
if len(source.GetParent()) != 0 {
// Get parent volume information
parent, err := util.VolumeFromName(s.driver(ctx), source.Parent)
parent, err := util.VolumeFromName(correlation.TODO(), s.driver(ctx), source.Parent)
if err != nil {
return "", status.Errorf(
codes.NotFound,
Expand Down Expand Up @@ -501,7 +502,7 @@ func (s *VolumeServer) Inspect(
}
v = vols[0]
} else {
vols, err := s.driver(ctx).Inspect([]string{req.GetVolumeId()})
vols, err := s.driver(ctx).Inspect(correlation.TODO(), []string{req.GetVolumeId()})
if err == kvdb.ErrNotFound || (err == nil && len(vols) == 0) {
return nil, status.Errorf(
codes.NotFound,
Expand Down Expand Up @@ -754,7 +755,7 @@ func (s *VolumeServer) Stats(
return nil, err
}

stats, err := s.driver(ctx).Stats(req.GetVolumeId(), !req.GetNotCumulative())
stats, err := s.driver(ctx).Stats(ctx, req.GetVolumeId(), !req.GetNotCumulative())
if err != nil {
return nil, status.Errorf(
codes.Internal,
Expand Down
Loading

0 comments on commit 0428d70

Please sign in to comment.