Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWX-35559: add correlation tracing to nsm filter Stats #2402

Merged
merged 3 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions api/client/volume/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ func (v *volumeClient) Status() [][2]string {

// Inspect specified volumes.
// Errors ErrEnoEnt may be returned.
func (v *volumeClient) Inspect(ids []string) ([]*api.Volume, error) {
if len(ids) == 0 {
func (v *volumeClient) Inspect(ctx context.Context, volumeIDs []string) ([]*api.Volume, error) {
if len(volumeIDs) == 0 {
return nil, nil
}
var volumes []*api.Volume
request := v.c.Get().Resource(volumePath)
for _, id := range ids {
for _, id := range volumeIDs {
request.QueryOption(api.OptVolumeID, id)
}
if err := request.Do().Unmarshal(&volumes); err != nil {
Expand Down Expand Up @@ -246,10 +246,7 @@ func (v *volumeClient) Restore(volumeID string, snapID string) error {

// Stats for specified volume.
// Errors ErrEnoEnt may be returned
func (v *volumeClient) Stats(
volumeID string,
cumulative bool,
) (*api.Stats, error) {
func (v *volumeClient) Stats(ctx context.Context, volumeID string, cumulative bool) (*api.Stats, error) {
stats := &api.Stats{}
req := v.c.Get().Resource(volumePath + "/stats").Instance(volumeID)
req.QueryOption(api.OptCumulative, strconv.FormatBool(cumulative))
Expand Down Expand Up @@ -306,9 +303,7 @@ func (v *volumeClient) CapacityUsage(
return requests, nil
}

func (v *volumeClient) VolumeUsageByNode(
nodeID string,
) (*api.VolumeUsageByNode, error) {
func (v *volumeClient) VolumeUsageByNode(ctx context.Context, nodeID string) (*api.VolumeUsageByNode, error) {

return nil, volume.ErrNotSupported

Expand Down
3 changes: 2 additions & 1 deletion api/client/volume/client_volume_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package volume

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
Expand All @@ -25,7 +26,7 @@ func TestClientTLS(t *testing.T) {

clnt.SetTLS(&tls.Config{InsecureSkipVerify: true})

_, err = VolumeDriver(clnt).Inspect([]string{"12345"})
_, err = VolumeDriver(clnt).Inspect(context.TODO(), []string{"12345"})

require.NoError(t, err)
}
24 changes: 12 additions & 12 deletions api/server/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ func (d *driver) errorResponse(method string, w http.ResponseWriter, err error)
}
}

func (d *driver) volFromName(name string) (*api.Volume, error) {
func (d *driver) volFromName(ctx context.Context, name string) (*api.Volume, error) {
v, err := volumedrivers.Get(d.name)
if err != nil {
return nil, fmt.Errorf("Cannot locate volume driver for %s: %s", d.name, err.Error())
return nil, fmt.Errorf("cannot locate volume driver for %s: %s", d.name, err.Error())
}
return util.VolumeFromName(v, name)
return util.VolumeFromName(ctx, v, name)
}

func (d *driver) volFromNameOrIDSdk(ctx context.Context, volumes api.OpenStorageVolumeClient, name string) (*api.Volume, error) {
Expand Down Expand Up @@ -254,10 +254,10 @@ func (d *driver) attachTokenMount(ctx context.Context, request *mountRequest) (c

// parseTokenInput reads token input from the given name and opts.
// The following is the order of precedence for token in types:
// 1. token=<token> in name
// 2. token in opts
// 3. token_secret=<secret> in name
// 4. token_secret in opts
// 1. token=<token> in name
// 2. token in opts
// 3. token_secret=<secret> in name
// 4. token_secret in opts
func (d *driver) parseTokenInput(name string, opts map[string]string) (string, error) {
// get token from name
tokenFromName, tokenInName := d.GetTokenFromString(name)
Expand Down Expand Up @@ -484,7 +484,7 @@ func (d *driver) scaleUp(
return nil, err
}
id := resp.GetVolumeId()
if outVol, err = d.volFromName(id); err != nil {
if outVol, err = d.volFromName(ctx, id); err != nil {
return nil, err
}
_, err = mountClient.Attach(ctx, &api.SdkVolumeAttachRequest{
Expand Down Expand Up @@ -571,7 +571,7 @@ func (d *driver) attachScale(
return d.scaleUp(ctx, conn, method, vd, inSpec, inVol, allVols, attachOptions)
}
id := resp.GetVolumeId()
outVol, err := d.volFromName(id)
outVol, err := d.volFromName(ctx, id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -791,7 +791,7 @@ func (d *driver) path(w http.ResponseWriter, r *http.Request) {
}

_, _, _, _, name := d.SpecFromString(request.Name)
vol, err := d.volFromName(name)
vol, err := d.volFromName(r.Context(), name)
if err != nil {
e := d.volNotFound(method, request.Name, err, w)
d.errorResponse(method, w, e)
Expand Down Expand Up @@ -824,7 +824,7 @@ func (d *driver) get(w http.ResponseWriter, r *http.Request) {
} else {
returnName = name
}
vol, err := d.volFromName(name)
vol, err := d.volFromName(correlation.TODO(), name)
if err != nil {
e := d.volNotFound(method, request.Name, err, w)
d.errorResponse(method, w, e)
Expand Down Expand Up @@ -859,7 +859,7 @@ func (d *driver) unmount(w http.ResponseWriter, r *http.Request) {

_, _, _, _, name := d.SpecFromString(request.Name)
nameWithID := name + request.ID
vol, err := d.volFromName(name)
vol, err := d.volFromName(ctx, name)
if err != nil {
e := d.volNotFound(method, name, err, w)
d.errorResponse(method, w, e)
Expand Down
8 changes: 4 additions & 4 deletions api/server/middleware_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (a *authMiddleware) setWithAuth(w http.ResponseWriter, r *http.Request, nex
if err != nil {
processErrorForVolSetResponse(req.Action, err, &resp)
} else {
v, err := d.Inspect([]string{volumeID})
v, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil {
processErrorForVolSetResponse(req.Action, err, &resp)
} else if v == nil || len(v) != 1 {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (a *authMiddleware) deleteWithAuth(w http.ResponseWriter, r *http.Request,
return
}

vols, err := d.Inspect([]string{volumeID})
vols, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil || len(vols) == 0 || vols[0] == nil {
json.NewEncoder(w).Encode(volumeResponse)
return
Expand Down Expand Up @@ -338,7 +338,7 @@ func (a *authMiddleware) inspectWithAuth(w http.ResponseWriter, r *http.Request,
return
}

dk, err := d.Inspect([]string{volumeID})
dk, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil {
a.log(volumeID, fn).WithError(err).Error("Failed to inspect volume")
http.Error(w, err.Error(), http.StatusNotFound)
Expand Down Expand Up @@ -368,7 +368,7 @@ func (a *authMiddleware) enumerateWithAuth(w http.ResponseWriter, r *http.Reques
}
volumeID := volIDs[0]

vols, err := d.Inspect([]string{volumeID})
vols, err := d.Inspect(correlation.TODO(), []string{volumeID})
if err != nil || len(vols) == 0 || vols[0] == nil {
a.log(volumeID, fn).WithError(err).Error("Failed to get volume object")
json.NewEncoder(w).Encode(emptyVols)
Expand Down
2 changes: 1 addition & 1 deletion api/server/sdk/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *NodeServer) VolumeUsageByNode(
if s.server.driver(ctx) == nil {
return nil, status.Error(codes.Unavailable, "Resource has not been initialized")
}
resp, err := s.server.driver(ctx).VolumeUsageByNode(req.GetNodeId())
resp, err := s.server.driver(ctx).VolumeUsageByNode(ctx, req.GetNodeId())
if err != nil {
return nil, status.Errorf(codes.Internal, " Failed to get VolumeUsageByNode :%v", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion api/server/sdk/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/golang/mock/gomock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -460,7 +461,7 @@ func TestSdkVolumeUsageByNode(t *testing.T) {

s.MockCluster().EXPECT().Enumerate().Return(cluster, nil).Times(1)
s.MockCluster().EXPECT().Inspect(nodeid).Return(node, nil).Times(2)
s.MockDriver().EXPECT().VolumeUsageByNode(nodeid).Return(&volumeUsageInfo, nil).Times(1)
s.MockDriver().EXPECT().VolumeUsageByNode(gomock.Any(), nodeid).Return(&volumeUsageInfo, nil).Times(1)

// Setup client
c := api.NewOpenStorageNodeClient(s.Conn())
Expand Down
2 changes: 1 addition & 1 deletion api/server/sdk/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestAuthorizationServerInterceptorCreateVolume(t *testing.T) {
gomock.InOrder(
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Inspect(gomock.Any(), []string{name}).
Return(nil, fmt.Errorf("not found")).
AnyTimes(),
s.MockDriver().
Expand Down
13 changes: 7 additions & 6 deletions api/server/sdk/volume_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/pkg/auth"
"github.com/libopenstorage/openstorage/pkg/correlation"
policy "github.com/libopenstorage/openstorage/pkg/storagepolicy"
"github.com/libopenstorage/openstorage/pkg/util"
"github.com/libopenstorage/openstorage/volume"
Expand All @@ -53,7 +54,7 @@ func (s *VolumeServer) waitForVolumeReady(ctx context.Context, id string) (*api.
func() (bool, error) {
var err error
// Get the latest status from the volume
v, err = util.VolumeFromName(s.driver(ctx), id)
v, err = util.VolumeFromName(correlation.TODO(), s.driver(ctx), id)
if err != nil {
return false, status.Errorf(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (s *VolumeServer) waitForVolumeRemoved(ctx context.Context, id string) erro
250*time.Millisecond, // period
func() (bool, error) {
// Get the latest status from the volume
if _, err := util.VolumeFromName(s.driver(ctx), id); err != nil {
if _, err := util.VolumeFromName(correlation.TODO(), s.driver(ctx), id); err != nil {
// Removed
return false, nil
}
Expand All @@ -108,7 +109,7 @@ func (s *VolumeServer) create(

// Check if the volume has already been created or is in process of creation
volName := locator.GetName()
v, err := util.VolumeFromName(s.driver(ctx), volName)
v, err := util.VolumeFromName(ctx, s.driver(ctx), volName)
// If the volume is still there but it is being delete, then wait until it is removed
if err == nil && v.GetState() == api.VolumeState_VOLUME_STATE_DELETED {
if err = s.waitForVolumeRemoved(ctx, volName); err != nil {
Expand Down Expand Up @@ -155,7 +156,7 @@ func (s *VolumeServer) create(
var id string
if len(source.GetParent()) != 0 {
// Get parent volume information
parent, err := util.VolumeFromName(s.driver(ctx), source.Parent)
parent, err := util.VolumeFromName(correlation.TODO(), s.driver(ctx), source.Parent)
if err != nil {
return "", status.Errorf(
codes.NotFound,
Expand Down Expand Up @@ -501,7 +502,7 @@ func (s *VolumeServer) Inspect(
}
v = vols[0]
} else {
vols, err := s.driver(ctx).Inspect([]string{req.GetVolumeId()})
vols, err := s.driver(ctx).Inspect(correlation.TODO(), []string{req.GetVolumeId()})
if err == kvdb.ErrNotFound || (err == nil && len(vols) == 0) {
return nil, status.Errorf(
codes.NotFound,
Expand Down Expand Up @@ -754,7 +755,7 @@ func (s *VolumeServer) Stats(
return nil, err
}

stats, err := s.driver(ctx).Stats(req.GetVolumeId(), !req.GetNotCumulative())
stats, err := s.driver(ctx).Stats(ctx, req.GetVolumeId(), !req.GetNotCumulative())
if err != nil {
return nil, status.Errorf(
codes.Internal,
Expand Down
Loading
Loading