Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Watch API and initial implementation #2281

Merged
merged 29 commits into from
Jul 12, 2023

Conversation

dahuang-purestorage
Copy link
Contributor

@dahuang-purestorage dahuang-purestorage commented Jun 21, 2023

What this PR does / why we need it:
This PR implments a Watch gRPC call in libopenstorage on Volumes.
In porx, the driver is expected to implement GetVolumeWatcher which will return a go channel used in libopenstorage server to relay the volumes.
In libopenstorage, there is a list of watch connections that keeps track of the current clients. Also, there is a go routine constantly waiting from a volume from porx. Once a volume is recieved in the go channel, openstorage will relay it to the watch clients.

Which issue(s) this PR fixes (optional)
Closes # PWX-29706

Special notes for your reviewer:
TODO:

  • Adding Unit test
  • Figure out the best place to start watcher
  • Determine if it's possible to have a generic channel for additional event/resources (nodes, disks, etc). So it would be easier to add watch endpoint for them as well.

Testing:

  • Smoke testing with porx on creating, deleting and attaching volumes. Snippet of working output:
grpcurl -plaintext -d '{"volume_event": {"labels": [{}]}}' 0.0.0.0:9020  openstorage.api.OpenStorageWatch.Watch | jq '.volumeEvent.volume.id + " " + .volumeEvent.volume.status + " " + .volumeEvent.volume.state'
"441214862979454928 VOLUME_STATUS_UP VOLUME_STATE_DETACHED"
"27660964167843585 VOLUME_STATUS_UP VOLUME_STATE_ATTACHED"
"27660964167843585 VOLUME_STATUS_UP VOLUME_STATE_ATTACHED"
"27660964167843585 VOLUME_STATUS_UP VOLUME_STATE_DELETED"
....

lpabon
lpabon previously requested changes Jun 26, 2023
api/api.proto Outdated
@@ -2512,6 +2512,15 @@ service OpenStorageVolume {
body: "*"
};
}

// Watch streams a list of volumes exist in the system.
rpc Watch(SdkVolumeWatchRequest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, but I would suggest creating a model of Watch for anything on the system, not in the Volumes service. That way we can watch for Pool, Disk, Volume, or any other type of event. Then the event message could use oneof for each type of storage event.

api/api.proto Outdated
// openstorage API. Right now, we will only support Volumes.
service OpenStorageWatch{

// Watch streams of volumes exist in the system.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be changed since this is a generic event API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more documentation here on how to use. Doc can be written in markdown format and will be used in the website https://libopenstorage.github.io

@@ -114,11 +118,35 @@ func newFakeDriver(params map[string]string) (*driver, error) {
}
}
}

// go volumeGenerator(inst)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove commented code

api/api.proto Outdated
@@ -3763,6 +3781,39 @@ message SdkVolumeSnapshotScheduleUpdateRequest {
message SdkVolumeSnapshotScheduleUpdateResponse {
}

// Defines the request to watch a volume
message SdkWatchRequest {
// one of event type
Copy link
Contributor

@ggriffiths ggriffiths Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reiterate Luis's comments, maybe expand on some of these comments as it's autogenerated for API docs

volumeEventType = "Volume"
)

type WathcerServer struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: WathcerServer -> WatcherServer

for event := range client.eventChannel {
var vol *api.Volume
if vol, ok = event.(*api.Volume); !ok {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log invalid type here so we know if there's any issues here

api/api.proto Outdated
}


// OpenStorageWatcher is a service used to stream events in the system
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: OpenStorageWatcher is a service that provides APIs for watching on resources and receive them as a stream of events.

api/api.proto Outdated
// OpenStorageWatcher is a service used to stream events in the system
service OpenStorageWatch{

// Watch streams of event exist in the system
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Watch on resources managed by the driver and receive them as a stream of events.

api/api.proto Outdated
// Defines the request to watch an openstorage volume event for the given label
// if the label is empty, it returns all the volume events
message SdkVolumeWatchRequest {
// Options during watch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix comment - labels to filter out the volumes to watch on

api/api.proto Outdated
rpc Watch(SdkWatchRequest)
returns (stream SdkWatchResponse) {
option(google.api.http) = {
post: "/v1/watch/watch"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be just /v1/watch. We are not watching on specific resources anymore so we don't need the extra prefix of service name anymore.

api/api.proto Outdated
// Name of volume
string name = 2;
// Volume labels
map<string, string> labels = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return the labels separately? The labels are already a part of Volume

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there could be additional labels that were not part of the request

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I get that, but the volume object you are returning on line 3814, already has all the labels (even the ones that were not requested)

if req.GetVolumeEvent() != nil {
return w.volumeWatch(req.GetVolumeEvent(), stream)
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return an error here instead since we were unable to parse the request and/or the object is unsupported?

// wait for driver to be initialized with an non-empty volume watcher
for {
if s.volumeServer.driver(ctx) == nil {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will spin continuously if invoked before driver is initialized. Add a sleep may be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a sleep here before merging the PR

api/server/sdk/watcher.go Show resolved Hide resolved
client := watchConnection{
name: uuid.New(),
eventType: volumeEventType,
eventChannel: make(chan interface{}, 2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a buffer of only 2? This could lead to issues where for some reason the channel is full (say the stream is blocked) and we keep spinning up go routines on line 132 as part of callback invocation.

It would be better to have an unbounded channel, so that we keep queueing in the events, and avoid goroutines waiting on sending on this channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbuffered channel will wait on receiving the data too. AFAIK, unbuffered channel is a size 1 channel that waits on single event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to an unbounded channel and not unbuffered. Essentially just an unbounded queue that will keep collecting the events.


// wait only as long as context deadline allows
select {
case err := <-errChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do this here?

Suggested change
case err := <-errChan:
case err := <-group.Wait():

Copy link
Contributor Author

@dahuang-purestorage dahuang-purestorage Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could but we will need a for loop to wait for it. group.Wait() returns an error instead of a channel. Doing it this way couples well with ctx.Done.

continue
}

volumeChannel, _ := s.volumeServer.driver(ctx).GetVolumeWatcher(&api.VolumeLocator{}, make(map[string]string))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the error here?

continue
}
resp := convertApiVolumeToSdkReponse(vol)
if err := stream.Send(resp); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figure out if we can set timeout whenever stream.Send fails

Copy link
Member

@lpabon lpabon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 I approve of of the changes to the API. I'll leave the implementation checks to Grant and Aditya.

@github-actions
Copy link

github-actions bot commented Jul 7, 2023

This PR is stale because it has been in review for 3 days with no activity.

Copy link
Contributor

@adityadani adityadani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good. I have some pending questions on the PR. Also do you think we should write client stubs for the streaming API as well, so that the callers can simply use it w/ the correct server keepalive timeouts.

api/api.proto Outdated
// Name of volume
string name = 2;
// Volume labels
map<string, string> labels = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I get that, but the volume object you are returning on line 3814, already has all the labels (even the ones that were not requested)

@@ -142,6 +142,8 @@ type ServerConfig struct {

// Server is an implementation of the gRPC SDK interface
type Server struct {
ctx context.Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename this as watcherCtx and watcherCtxCancel so that it doesn't get used anywhere else?

for event := range client.eventChannel {

// create a new context that will return error if execution took more than streamTimeout
timeoutCtx, timeoutCancelled := context.WithTimeout(ctx, streamTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following how this timeoutCtx is being enforced? Isn't steam.Send() a blocking function?

return err
}
if timeoutCtx.Err() != nil {
logrus.Warnf("context error: %v", timeoutCtx.Err())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add more info in this log line

// wait for driver to be initialized with an non-empty volume watcher
for {
if s.volumeServer.driver(ctx) == nil {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a sleep here before merging the PR

}
s.RUnlock()
case <-ctx.Done():
logrus.Infof("exiting volume watcher\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the \n. Also can you change logs to start with uppercase letters?

continue
}
resp := convertApiVolumeToSdkReponse(vol)
if err := stream.Send(resp); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the timeoutCtx here too.

volume/volume.go Show resolved Hide resolved
Copy link
Contributor

@adityadani adityadani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -40,7 +40,7 @@ func (w *WatcherServer) Watch(req *api.SdkWatchRequest, stream api.OpenStorageWa
if req.GetVolumeEvent() != nil {
return w.volumeWatch(req.GetVolumeEvent(), stream)
}
return status.Errorf(codes.InvalidArgument, "invalid request type for watcher %v", req)
return status.Errorf(codes.InvalidArgument, "Invalid request type for watcher %v", req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: errors should still be lower case

Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
Signed-off-by: dahuang <[email protected]>
@dahuang-purestorage dahuang-purestorage merged commit 9b86347 into master Jul 12, 2023
2 checks passed
@dahuang-purestorage dahuang-purestorage deleted the PWX-29706-add-watch-volume-api branch July 12, 2023 18:03
piyush-nimbalkar pushed a commit to piyush-nimbalkar/openstorage that referenced this pull request Sep 21, 2023
* Adding Watch API and initial implementation
- new API introduced in api.proto
- new proto-driver function to get a volume watcher that passes "modified/new/deleted" volume through a golang channel
- implement watcher go-routine that constantly wait for volume from the golang channel that passes from porx
- implement the pipeline that passes volume info from watcher go-routine to all streaming clients

Tests:
UT passed
Local smoke test with Porx

Signed-off-by: dahuang <[email protected]>
piyush-nimbalkar pushed a commit that referenced this pull request Sep 21, 2023
* Adding Watch API and initial implementation
- new API introduced in api.proto
- new proto-driver function to get a volume watcher that passes "modified/new/deleted" volume through a golang channel
- implement watcher go-routine that constantly wait for volume from the golang channel that passes from porx
- implement the pipeline that passes volume info from watcher go-routine to all streaming clients

Tests:
UT passed
Local smoke test with Porx

Signed-off-by: dahuang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants