Skip to content

Commit

Permalink
Added signal handlers to Poller & Subscriber
Browse files Browse the repository at this point in the history
Signed-off-by: stevef1uk <[email protected]>
  • Loading branch information
stevef1uk committed Jan 7, 2024
1 parent 1a61a05 commit 3f8a40a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 11 deletions.
45 changes: 45 additions & 0 deletions examples/sagaexecutor/cmd/poller/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

"net/http"
"os"
"os/signal"
"strings"
"syscall"

dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
Expand All @@ -23,6 +25,35 @@ var (
err error
)

func closeAll() {
client.Close()
the_service.CloseService()
}

func multiSignalHandler(signal os.Signal) {

switch signal {
case syscall.SIGHUP:
logger.Println("Signal:", signal.String())
closeAll()
os.Exit(0)
case syscall.SIGINT:
closeAll()
logger.Println("Signal:", signal.String())
os.Exit(0)
case syscall.SIGTERM:
logger.Println("Signal:", signal.String())
closeAll()
os.Exit(0)
case syscall.SIGQUIT:
closeAll()
logger.Println("Signal:", signal.String())
os.Exit(0)
default:
logger.Println("Unhandled/unknown signal")
}
}

func main() {
// create a Dapr service
s := daprd.NewService(address)
Expand All @@ -41,10 +72,24 @@ func main() {
the_service = service.NewService("")
defer the_service.CloseService()

sigchnl := make(chan os.Signal, 1)
signal.Notify(sigchnl, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) //we can add more sycalls.SIGQUIT etc.
exitchnl := make(chan int)

go func() {
for {
s := <-sigchnl
multiSignalHandler(s)
}
}()

// start the service
if err := s.Start(); err != nil && err != http.ErrServerClosed {
logger.Fatalf("error starting service: %v", err)
}

exitcode := <-exitchnl
os.Exit(exitcode)
}

func sagaHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) {
Expand Down
64 changes: 53 additions & 11 deletions examples/sagaexecutor/cmd/subscriber/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
//"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

dapr "github.com/dapr/go-sdk/client"
Expand All @@ -28,15 +30,45 @@ type dataElement struct {
LogData utility.Start_stop `json:"logdata"`
}

var sub = &common.Subscription{
PubsubName: service.PubsubComponentName,
Topic: "Dummy-Not-Used",
Route: "/receivemessage",
}
var (
sub = &common.Subscription{
PubsubName: service.PubsubComponentName,
Topic: "Dummy-Not-Used",
Route: "/receivemessage",
}
sub_client dapr.Client
logger = log.New(os.Stdout, "", 0)
the_service service.Server
)

var sub_client dapr.Client
func closeAll() {
sub_client.Close()
the_service.CloseService()
}

var the_service service.Server
func multiSignalHandler(signal os.Signal) {

switch signal {
case syscall.SIGHUP:
logger.Println("Signal:", signal.String())
closeAll()
os.Exit(0)
case syscall.SIGINT:
closeAll()
logger.Println("Signal:", signal.String())
os.Exit(0)
case syscall.SIGTERM:
logger.Println("Signal:", signal.String())
closeAll()
os.Exit(0)
case syscall.SIGQUIT:
closeAll()
logger.Println("Signal:", signal.String())
os.Exit(0)
default:
logger.Println("Unhandled/unknown signal")
}
}

func main() {
var err error
Expand All @@ -61,6 +93,17 @@ func main() {
log.Fatalf("error adding topic subscription: %v", err)
}

sigchnl := make(chan os.Signal, 1)
signal.Notify(sigchnl, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) //we can add more sycalls.SIGQUIT etc.
exitchnl := make(chan int)

go func() {
for {
s := <-sigchnl
multiSignalHandler(s)
}
}()

//log.Printf("Starting the server using port %s'n", appPort)
// Start the server
err = s.Start()
Expand All @@ -69,6 +112,9 @@ func main() {
log.Fatalf("error listenning: %v", err)
}
sub_client.Close()

exitcode := <-exitchnl
os.Exit(exitcode)
}

func storeMessage(client dapr.Client, m *utility.Start_stop) error {
Expand Down Expand Up @@ -104,11 +150,7 @@ func storeMessage(client dapr.Client, m *utility.Start_stop) error {
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {

//fmt.Printf("eventHandler received:%v\n", e.Data)

//return false, err // Uncomment this to flush through queues if necessary for testing

var m map[string]interface{} = e.Data.(map[string]interface{})

fmt.Printf("eventHandler Ordering Key = %s\n", m["OrderingKey"].(string))
Expand Down

0 comments on commit 3f8a40a

Please sign in to comment.