Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support SubscribeTopicEvents API #1078

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion demo/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ You can run server/client demo with different component names.
It is worth noting that both server and client demo should set the same store name by param `-s`.
For example:
```shell
cd ${project_path}/demo/pubsub/server/
#################### Run pubsub demo with appcallback ####################
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Directory path in README needs correction

The README.md contains an incorrect path. The directory structure shows server/appcallback/ but the README instructs to cd into demo/pubsub/appcallback/. This should be updated to demo/pubsub/server/appcallback/ to match the actual directory structure.

  • Line 5: The section header is correct as it reflects the appcallback demo
  • Line 6: Path should be corrected from ${project_path}/demo/pubsub/appcallback/ to ${project_path}/demo/pubsub/server/appcallback/
🔗 Analysis chain

Verify the section header matches the directory structure

Let's verify that the "appcallback" section header accurately reflects the directory structure.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the existence of the appcallback directory and its structure
# Expected: Directory should exist and contain the mentioned subscriber component

# Check if the appcallback directory exists
fd -t d "appcallback$" demo/pubsub

# List contents to verify the subscriber component
fd . demo/pubsub/appcallback -t f

Length of output: 215


Script:

#!/bin/bash
# Let's verify the actual structure and content of the pubsub demo directory
# to understand the correct organization

# Check the structure of demo/pubsub directory
fd . demo/pubsub -t d

# Look at the README.md content to understand the context better
cat demo/pubsub/README.md

Length of output: 754

cd ${project_path}/demo/pubsub/appcallback/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Address CI concerns with markdown shell commands

Based on the previous review comments, there are concerns about CI testing these markdown files. According to the documentation, adding new shell scripts directly might break CI.

Consider:

  1. Waiting for related PRs to be merged before adding this demo, as suggested in the previous discussion
  2. Ensuring the CI configuration is updated to handle these new paths
  3. Adding necessary test cases for the new demo path

# 1. start subscriber
go build -o subscriber
/.subscriber -s pub_subs_demo
Expand All @@ -16,5 +17,22 @@ go build -o layotto
cd ${project_path}/demo/pubsub/client/
go build -o publisher
./publisher -s pub_subs_demo

#################### Run pubsub demo with SubscribeTopicEvents ####################
# 1. start layotto
cd ${project_path}/cmd/layotto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为 CI 里会运行这些作为 demo 的 markdown 文件(执行这些 markdown 文件里的 shell 脚本),这样直接添加新的shell 脚本可能让 CI 跑不通哈,详细说明见 https://mosn.io/layotto/docs/development/test-quickstart

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前看起来只能都合并完了,再加demo?
@seeflood

go build -o layotto
./layotto start -c ../../configs/config_standalone.json

cd ${project_path}/demo/pubsub/dynamic/
# 2. start subscriber
go build -o subscriber
/.subscriber -s pub_subs_demo

# 3. start publisher
cd ${project_path}/demo/pubsub/client/
go build -o publisher
./publisher -s pub_subs_demo


```
96 changes: 96 additions & 0 deletions demo/pubsub/server/dynamic/subscribe_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 Layotto Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"errors"
"flag"
"log"
"time"

"mosn.io/layotto/sdk/go-sdk/client"

runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1"
)

var storeName string

func init() {
flag.StringVar(&storeName, "s", "", "set `storeName`")
}

func main() {
flag.Parse()
if storeName == "" {
panic("storeName is empty.")
}
testDynamicSub()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in main function

Using panic for configuration validation is not a good practice. Consider returning an error and handling it gracefully.

-func main() {
+func run() error {
     flag.Parse()
     if storeName == "" {
-        panic("storeName is empty.")
+        return fmt.Errorf("storeName is required")
     }
-    testDynamicSub()
+    return testDynamicSub()
+}
+
+func main() {
+    if err := run(); err != nil {
+        log.Fatalf("Error: %v", err)
+    }
 }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider improving error handling and adding graceful shutdown

The current implementation has several areas for improvement:

  1. The panic message could be more descriptive
  2. Consider implementing graceful shutdown handling
  3. The testDynamicSub() errors should be handled

Consider applying these improvements:

 func main() {
 	flag.Parse()
 	if storeName == "" {
-		panic("storeName is empty.")
+		log.Fatal("storeName is required: please provide it using the -s flag")
 	}
-	testDynamicSub()
+	
+	// Setup graceful shutdown
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	
+	// Handle shutdown signals
+	go func() {
+		sigCh := make(chan os.Signal, 1)
+		signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
+		<-sigCh
+		cancel()
+	}()
+	
+	if err := testDynamicSub(); err != nil {
+		log.Fatal("Failed to run subscription test:", err)
+	}
 }

Committable suggestion skipped: line range outside the PR's diff.


func testDynamicSub() {
cli, err := client.NewClient()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle error from client creation

The error from NewClient() is not being checked.

 func testDynamicSub() {
     cli, err := client.NewClient()
+    if err != nil {
+        return fmt.Errorf("failed to create client: %w", err)
+    }

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 golangci-lint

46-46: ineffectual assignment to err

(ineffassign)

// Use SubscribeWithHandler API to subscribe to a topic.
stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{
PubsubName: storeName,
Topic: "hello",
Metadata: nil,
}, eventHandler)

//Use Subscribe API to subscribe to a topic.
sub, err := cli.Subscribe(context.Background(), client.SubscriptionRequest{
PubsubName: storeName,
Topic: "topic1",
Metadata: nil,
})

if err != nil {
log.Fatalf("failed to subscribe to topic: %v", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling for subscriptions

The error from SubscribeWithHandler is not checked, and error handling is inconsistent between the two subscription methods.

-    stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{
+    stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{
         PubsubName: storeName,
         Topic:      "hello",
         Metadata:   nil,
     }, eventHandler)
+    if err != nil {
+        return fmt.Errorf("failed to subscribe with handler: %w", err)
+    }
 
     sub, err := cli.Subscribe(context.Background(), client.SubscriptionRequest{
         PubsubName: storeName,
         Topic:      "topic1",
         Metadata:   nil,
     })
     if err != nil {
-        log.Fatalf("failed to subscribe to topic: %v", err)
+        return fmt.Errorf("failed to subscribe to topic: %w", err)
     }

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 golangci-lint

49-49: ineffectual assignment to err

(ineffassign)


msg, err := sub.Receive()
if err != nil {
log.Fatalf("Error receiving message: %v", err)
}
log.Printf(">>[Subscribe API]Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.Id, msg.Data)

// Use _MUST_ always signal the result of processing the message, else the
// message will not be considered as processed and will be redelivered or
// dead lettered.
if err := msg.Success(); err != nil {
log.Fatalf("error sending message success: %v", err)
}

time.Sleep(time.Second * 10)

if err := errors.Join(stop(), sub.Close()); err != nil {

Check failure on line 82 in demo/pubsub/server/dynamic/subscribe_server.go

View workflow job for this annotation

GitHub Actions / Go CI Linter (./demo)

Join not declared by package errors (typecheck)
log.Fatal(err)
}

if err != nil {
panic(err)
}
cli.Close()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix error handling and improve resource management

Several critical issues need to be addressed:

  1. Missing error checks for client creation and subscription
  2. Hardcoded sleep duration
  3. Redundant error check at the end
  4. Resource cleanup could be improved

Apply these fixes:

-func testDynamicSub() {
+func testDynamicSub() error {
 	cli, err := client.NewClient()
+	if err != nil {
+		return fmt.Errorf("failed to create client: %w", err)
+	}
+	defer cli.Close()
 
 	// Use SubscribeWithHandler API to subscribe to a topic.
 	stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{
 		PubsubName: storeName,
 		Topic:      "hello",
 		Metadata:   nil,
 	}, eventHandler)
+	if err != nil {
+		return fmt.Errorf("failed to subscribe with handler: %w", err)
+	}
+	defer stop()
 
 	//Use Subscribe API to subscribe to a topic.
 	sub, err := cli.Subscribe(context.Background(), client.SubscriptionRequest{
@@ -58,6 +58,7 @@
 		Topic:      "topic1",
 		Metadata:   nil,
 	})
+	defer sub.Close()
 
 	if err != nil {
 		log.Fatalf("failed to subscribe to topic: %v", err)
@@ -80,13 +81,7 @@
 	time.Sleep(time.Second * 10)
 
-	if err := errors.Join(stop(), sub.Close()); err != nil {
-		log.Fatal(err)
-	}
-
-	if err != nil {
-		panic(err)
-	}
-	cli.Close()
+	return nil
 }

Additionally, consider replacing the hardcoded sleep with a configurable duration or context-based cancellation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func testDynamicSub() {
cli, err := client.NewClient()
// Use SubscribeWithHandler API to subscribe to a topic.
stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{
PubsubName: storeName,
Topic: "hello",
Metadata: nil,
}, eventHandler)
//Use Subscribe API to subscribe to a topic.
sub, err := cli.Subscribe(context.Background(), client.SubscriptionRequest{
PubsubName: storeName,
Topic: "topic1",
Metadata: nil,
})
if err != nil {
log.Fatalf("failed to subscribe to topic: %v", err)
}
msg, err := sub.Receive()
if err != nil {
log.Fatalf("Error receiving message: %v", err)
}
log.Printf(">>[Subscribe API]Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.Id, msg.Data)
// Use _MUST_ always signal the result of processing the message, else the
// message will not be considered as processed and will be redelivered or
// dead lettered.
if err := msg.Success(); err != nil {
log.Fatalf("error sending message success: %v", err)
}
time.Sleep(time.Second * 10)
if err := errors.Join(stop(), sub.Close()); err != nil {
log.Fatal(err)
}
if err != nil {
panic(err)
}
cli.Close()
}
func testDynamicSub() error {
cli, err := client.NewClient()
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
defer cli.Close()
// Use SubscribeWithHandler API to subscribe to a topic.
stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{
PubsubName: storeName,
Topic: "hello",
Metadata: nil,
}, eventHandler)
if err != nil {
return fmt.Errorf("failed to subscribe with handler: %w", err)
}
defer stop()
//Use Subscribe API to subscribe to a topic.
sub, err := cli.Subscribe(context.Background(), client.SubscriptionRequest{
PubsubName: storeName,
Topic: "topic1",
Metadata: nil,
})
defer sub.Close()
if err != nil {
log.Fatalf("failed to subscribe to topic: %v", err)
}
msg, err := sub.Receive()
if err != nil {
log.Fatalf("Error receiving message: %v", err)
}
log.Printf(">>[Subscribe API]Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.Id, msg.Data)
// Use _MUST_ always signal the result of processing the message, else the
// message will not be considered as processed and will be redelivered or
// dead lettered.
if err := msg.Success(); err != nil {
log.Fatalf("error sending message success: %v", err)
}
time.Sleep(time.Second * 10)
return nil
}
🧰 Tools
🪛 golangci-lint

46-46: ineffectual assignment to err

(ineffassign)


49-49: ineffectual assignment to err

(ineffassign)


func eventHandler(request *runtimev1pb.TopicEventRequest) client.SubscriptionResponseStatus {
log.Printf(">>[SubscribeWithHandler API] Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", request.PubsubName, request.Topic, request.Id, request.Data)
return client.SubscriptionResponseStatusSuccess
}
9 changes: 8 additions & 1 deletion pkg/grpc/default_api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"context"
"errors"
"sync"
"sync/atomic"

"github.com/dapr/components-contrib/secretstores"

Expand Down Expand Up @@ -86,8 +87,12 @@
// app callback
AppCallbackConn *grpc.ClientConn
topicPerComponent map[string]TopicSubscriptions
streamer *streamer
// json
json jsoniter.API
json jsoniter.API
closed atomic.Bool

Check failure on line 93 in pkg/grpc/default_api/api.go

View workflow job for this annotation

GitHub Actions / Go CI Linter (.)

Bool not declared by package atomic (typecheck)
closeCh chan struct{}
wg sync.WaitGroup
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix atomic.Bool type and ensure proper field initialization

The atomic.Bool type usage is incorrect. Additionally, these concurrency-related fields need proper initialization in the constructor.

  1. Fix the atomic.Bool type:
-	closed  atomic.Bool
+	closed  atomic.Value  // For Go < 1.19
// OR
+	closed  atomic.Bool{} // For Go >= 1.19
  1. Ensure all fields are properly initialized in NewAPI:
 return &api{
   // ... existing fields ...
   json:    jsoniter.ConfigFastest,
+  streamer: newStreamer(),
+  closeCh:  make(chan struct{}),
+  closed:   atomic.Bool{},  // or atomic.Value for Go < 1.19
 }

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Check: Go CI Linter (.)

[failure] 93-93:
Bool not declared by package atomic (typecheck)

}

func (a *api) Init(conn *grpc.ClientConn) error {
Expand Down Expand Up @@ -147,7 +152,9 @@
sendToOutputBindingFn: sendToOutputBindingFn,
secretStores: secretStores,
json: jsoniter.ConfigFastest,
//closeCh: make(chan struct{}),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Uncomment the closeCh initialization.

The closeCh field is declared but its initialization is commented out. This channel is likely needed for proper shutdown signaling in the subscription implementation.

-//closeCh:                  make(chan struct{}),
+closeCh:                  make(chan struct{}),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
//closeCh: make(chan struct{}),
closeCh: make(chan struct{}),

}

Comment on lines +153 to +155
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Based on the gathered information, I can now provide the final response:

Critical: Constructor must initialize streamer field

The streamer field requires initialization in the constructor as it's actively used in subscription handling:

  • The field is used in Subscribe operations without nil checks
  • Tests explicitly initialize it with subscribers map
  • The struct definition shows it contains critical synchronization primitives

Required initialization:

streamer: &streamer{
    subscribers: make(map[string]*conn),
},
🔗 Analysis chain

Critical: Constructor missing required field initializations

The constructor only initializes the json field but leaves other critical fields uninitialized:

  1. The commented out closeCh initialization suggests it's needed but was deliberately disabled
  2. Missing initialization for streamer and wg fields

Let's verify the usage of these fields in the codebase:


Let me gather more context about the struct definition and initialization patterns.


Let me gather more information about the streamer initialization and struct definition.


Let me check one more thing about the streamer field and its initialization in the constructor.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify field usage patterns that could lead to runtime issues

# Check for streamer usage without nil checks
echo "Checking streamer usage patterns..."
ast-grep --pattern 'a.streamer.$_'

# Check for closeCh usage
echo "Checking closeCh usage patterns..."
rg 'closeCh.*<-' 

# Check for WaitGroup usage
echo "Checking WaitGroup usage patterns..."
rg 'wg\.(Add|Done|Wait)'

Length of output: 5831


Script:

#!/bin/bash
# Check the struct definition and field types
echo "Checking struct definition..."
rg -A 10 "type API struct"

# Check initialization patterns
echo "Checking initialization patterns..."
rg "New.*API" -A 5

# Check if streamer is initialized elsewhere
echo "Checking streamer initialization..."
rg "streamer.*=.*new\("

Length of output: 58900


Script:

#!/bin/bash
# Check the struct definition and streamer field usage
echo "Checking API struct definition..."
rg -B 2 -A 10 "type api struct"

# Check streamer initialization in the codebase
echo "Checking streamer initialization patterns..."
rg "streamer.*=.*{" -A 3

# Check if streamer is initialized in tests
echo "Checking test initialization patterns..."
rg "apiForTest.streamer.*=.*" -A 3

Length of output: 2264


Script:

#!/bin/bash
# Check the streamer field in the struct and its initialization
echo "Checking streamer field and initialization..."
rg -B 5 -A 10 "type streamer struct"

# Check NewAPI implementation
echo "Checking NewAPI implementation..."
rg -B 5 -A 15 "func NewAPI\("

Length of output: 7913

}

func (a *api) SayHello(ctx context.Context, in *runtimev1pb.SayHelloRequest) (*runtimev1pb.SayHelloResponse, error) {
Expand Down
95 changes: 50 additions & 45 deletions pkg/grpc/default_api/api_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,56 +151,14 @@ func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) {
}

func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return err
}

// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
return nil
}

// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}

// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return err
}

envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil

if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
// TODO tracing

// 4. Call appcallback
envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix ineffectual assignment to err variable.

The variable err assigned from envelopeFromSubscriptionMessage is not used, which may lead to ignoring potential errors. Consider handling the error or explicitly ignoring it.

Apply the following fix to handle the error:

 envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)
+if err != nil {
+    return err
+}

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Check: Go CI Linter (.)

[failure] 156-156:
ineffectual assignment to err (ineffassign)

// Call appcallback
clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
res, err := clientV1.OnTopicEvent(ctx, envelope)

// 5. Check result
// Check result
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and nil checks.

The current implementation has two potential issues:

  1. The error from envelopeFromSubscriptionMessage is not checked
  2. Missing nil check for envelope which could be nil for expired messages

Apply this diff to fix both issues:

 envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)
+if err != nil {
+    return err
+}
+if envelope == nil {
+    // Message has expired, skip processing
+    return nil
+}
 // Call appcallback
 clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
 res, err := clientV1.OnTopicEvent(ctx, envelope)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)
// Call appcallback
clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
res, err := clientV1.OnTopicEvent(ctx, envelope)
// 5. Check result
// Check result
envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)
if err != nil {
return err
}
if envelope == nil {
// Message has expired, skip processing
return nil
}
// Call appcallback
clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
res, err := clientV1.OnTopicEvent(ctx, envelope)
// Check result

return retryStrategy(err, res, cloudEvent)
}

Expand Down Expand Up @@ -246,3 +204,50 @@ func listTopicSubscriptions(client runtimev1pb.AppCallbackClient, log log.ErrorL
}
return make([]*runtimev1pb.TopicSubscription, 0)
}

func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.NewMessage) (*runtimev1pb.TopicEventRequest, map[string]interface{}, error) {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return nil, cloudEvent, err
}

// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
return nil, cloudEvent, nil
}

// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}
Comment on lines +234 to +241
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for required cloud event fields and safe type assertions.

The current implementation performs unsafe type assertions which could cause panics. Additionally, there's no validation for required cloud event fields.

Apply this diff to add validation and safe type assertions:

+    // Validate required fields
+    requiredFields := []string{pubsub.IDField, pubsub.SourceField, pubsub.DataContentTypeField, pubsub.TypeField, pubsub.SpecVersionField}
+    for _, field := range requiredFields {
+        if _, ok := cloudEvent[field]; !ok {
+            return nil, cloudEvent, fmt.Errorf("required field %s missing from cloud event", field)
+        }
+    }
+
+    // Safe type assertions
+    id, ok := cloudEvent[pubsub.IDField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.IDField)
+    }
+    source, ok := cloudEvent[pubsub.SourceField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SourceField)
+    }
+    contentType, ok := cloudEvent[pubsub.DataContentTypeField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.DataContentTypeField)
+    }
+    eventType, ok := cloudEvent[pubsub.TypeField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.TypeField)
+    }
+    specVersion, ok := cloudEvent[pubsub.SpecVersionField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SpecVersionField)
+    }

     envelope := &runtimev1pb.TopicEventRequest{
-        Id:              cloudEvent[pubsub.IDField].(string),
-        Source:          cloudEvent[pubsub.SourceField].(string),
-        DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
-        Type:            cloudEvent[pubsub.TypeField].(string),
-        SpecVersion:     cloudEvent[pubsub.SpecVersionField].(string),
+        Id:              id,
+        Source:          source,
+        DataContentType: contentType,
+        Type:            eventType,
+        SpecVersion:     specVersion,
         Topic:           msg.Topic,
         PubsubName:      msg.Metadata[Metadata_key_pubsubName],
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}
// Validate required fields
requiredFields := []string{pubsub.IDField, pubsub.SourceField, pubsub.DataContentTypeField, pubsub.TypeField, pubsub.SpecVersionField}
for _, field := range requiredFields {
if _, ok := cloudEvent[field]; !ok {
return nil, cloudEvent, fmt.Errorf("required field %s missing from cloud event", field)
}
}
// Safe type assertions
id, ok := cloudEvent[pubsub.IDField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.IDField)
}
source, ok := cloudEvent[pubsub.SourceField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SourceField)
}
contentType, ok := cloudEvent[pubsub.DataContentTypeField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.DataContentTypeField)
}
eventType, ok := cloudEvent[pubsub.TypeField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.TypeField)
}
specVersion, ok := cloudEvent[pubsub.SpecVersionField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SpecVersionField)
}
envelope := &runtimev1pb.TopicEventRequest{
Id: id,
Source: source,
DataContentType: contentType,
Type: eventType,
SpecVersion: specVersion,
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}


// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, err
}
Comment on lines +245 to +249
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix error variable in base64 decode error handling.

The error handling for base64 decode returns the wrong error variable.

Apply this diff to fix the error handling:

         decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
         if decodeErr != nil {
             log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
-            return nil, cloudEvent, err
+            return nil, cloudEvent, decodeErr
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, err
}
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, decodeErr
}


envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil

if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
Comment on lines +255 to +259
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add safe type assertions and handle JSON marshal errors.

The current implementation has unsafe type assertions for string data and ignores JSON marshal errors.

Apply this diff to add safe type assertions and handle marshal errors:

         if contenttype.IsStringContentType(envelope.DataContentType) {
-            envelope.Data = []byte(data.(string))
+            dataStr, ok := data.(string)
+            if !ok {
+                return nil, cloudEvent, fmt.Errorf("invalid type for data field: expected string")
+            }
+            envelope.Data = []byte(dataStr)
         } else if contenttype.IsJSONContentType(envelope.DataContentType) {
-            envelope.Data, _ = a.json.Marshal(data)
+            var err error
+            envelope.Data, err = a.json.Marshal(data)
+            if err != nil {
+                return nil, cloudEvent, fmt.Errorf("failed to marshal data field: %v", err)
+            }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
if contenttype.IsStringContentType(envelope.DataContentType) {
dataStr, ok := data.(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for data field: expected string")
}
envelope.Data = []byte(dataStr)
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
var err error
envelope.Data, err = a.json.Marshal(data)
if err != nil {
return nil, cloudEvent, fmt.Errorf("failed to marshal data field: %v", err)
}
}

}
return envelope, cloudEvent, nil
}
Comment on lines +217 to +262
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for required cloud event fields and safe type assertions

The current implementation lacks validation for required cloud event fields and performs unsafe type assertions, which could lead to panics if the data is not as expected.

Please add validation for required fields and use safe type assertions to enhance robustness. Consider applying the following changes:

+    // Validate required fields
+    requiredFields := []string{pubsub.IDField, pubsub.SourceField, pubsub.DataContentTypeField, pubsub.TypeField, pubsub.SpecVersionField}
+    for _, field := range requiredFields {
+        if _, ok := cloudEvent[field]; !ok {
+            return nil, cloudEvent, fmt.Errorf("required field %s missing from cloud event", field)
+        }
+    }
+
+    // Safe type assertions
+    id, ok := cloudEvent[pubsub.IDField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.IDField)
+    }
+    source, ok := cloudEvent[pubsub.SourceField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SourceField)
+    }
+    contentType, ok := cloudEvent[pubsub.DataContentTypeField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.DataContentTypeField)
+    }
+    eventType, ok := cloudEvent[pubsub.TypeField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.TypeField)
+    }
+    specVersion, ok := cloudEvent[pubsub.SpecVersionField].(string)
+    if !ok {
+        return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SpecVersionField)
+    }
+
     envelope := &runtimev1pb.TopicEventRequest{
-        Id:              cloudEvent[pubsub.IDField].(string),
-        Source:          cloudEvent[pubsub.SourceField].(string),
-        DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
-        Type:            cloudEvent[pubsub.TypeField].(string),
-        SpecVersion:     cloudEvent[pubsub.SpecVersionField].(string),
+        Id:              id,
+        Source:          source,
+        DataContentType: contentType,
+        Type:            eventType,
+        SpecVersion:     specVersion,
         Topic:           msg.Topic,
         PubsubName:      msg.Metadata[Metadata_key_pubsubName],
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.NewMessage) (*runtimev1pb.TopicEventRequest, map[string]interface{}, error) {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return nil, cloudEvent, err
}
// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
return nil, cloudEvent, nil
}
// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}
// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, err
}
envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil
if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
return envelope, cloudEvent, nil
}
func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.NewMessage) (*runtimev1pb.TopicEventRequest, map[string]interface{}, error) {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return nil, cloudEvent, err
}
// Validate required fields
requiredFields := []string{pubsub.IDField, pubsub.SourceField, pubsub.DataContentTypeField, pubsub.TypeField, pubsub.SpecVersionField}
for _, field := range requiredFields {
if _, ok := cloudEvent[field]; !ok {
return nil, cloudEvent, fmt.Errorf("required field %s missing from cloud event", field)
}
}
// Safe type assertions
id, ok := cloudEvent[pubsub.IDField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.IDField)
}
source, ok := cloudEvent[pubsub.SourceField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SourceField)
}
contentType, ok := cloudEvent[pubsub.DataContentTypeField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.DataContentTypeField)
}
eventType, ok := cloudEvent[pubsub.TypeField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.TypeField)
}
specVersion, ok := cloudEvent[pubsub.SpecVersionField].(string)
if !ok {
return nil, cloudEvent, fmt.Errorf("invalid type for field %s: expected string", pubsub.SpecVersionField)
}
// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
return nil, cloudEvent, nil
}
// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: id,
Source: source,
DataContentType: contentType,
Type: eventType,
SpecVersion: specVersion,
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}
// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, err
}
envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil
if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
return envelope, cloudEvent, nil
}

Loading
Loading