Skip to content

Commit

Permalink
chore: Add new task handler for greeting messages to check running of…
Browse files Browse the repository at this point in the history
… more then one task handler
  • Loading branch information
dmitrymomot committed May 24, 2024
1 parent a6e7f5f commit 1d45aa7
Showing 1 changed file with 64 additions and 12 deletions.
76 changes: 64 additions & 12 deletions example/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,42 @@ package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/dmitrymomot/asyncer"
"golang.org/x/sync/errgroup"
)

const (
redisAddr = "redis://localhost:6379/0"
TestTaskName = "queued_task"
redisAddr = "redis://localhost:6379/0"
TestTaskName = "queued_task"
TestTaskName2 = "queued_task_2"
)

type TestTaskPayload struct {
Name string
}

type TestTaskPayload2 struct {
Greeting string
}

// test task handler function
func testTaskHandler(ctx context.Context, payload TestTaskPayload) error {
func testTaskHandler(_ context.Context, payload TestTaskPayload) error {
fmt.Printf("Hello, %s!\n", payload.Name)
return nil
}

// test task handler function
func testTaskHandler2(_ context.Context, payload TestTaskPayload2) error {
fmt.Printf("Hola, %s!\n", payload.Greeting)
return nil
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -34,24 +49,61 @@ func main() {
eg.Go(asyncer.RunQueueServer(
ctx, redisAddr, nil,
// Register a handler for the task.
asyncer.HandlerFunc[TestTaskPayload](TestTaskName, testTaskHandler),
asyncer.HandlerFunc(TestTaskName, testTaskHandler),
asyncer.HandlerFunc(TestTaskName2, testTaskHandler2),
// ... add more handlers here ...
))

// Create a new enqueuer with redis as the broker.
enqueuer := asyncer.MustNewEnqueuer(redisAddr)
enqueuer := asyncer.MustNewEnqueuer(
redisAddr,
asyncer.WithTaskDeadline(10*time.Minute),
asyncer.WithMaxRetry(0),
)
defer enqueuer.Close()

// Enqueue a task with payload.
// The task will be processed after immediately.
for i := 0; i < 10; i++ {
if err := enqueuer.EnqueueTask(ctx, TestTaskName, TestTaskPayload{
Name: fmt.Sprintf("Test %d", i),
}); err != nil {
panic(err)
eg.Go(func() error {
var i int
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
i++
if err := enqueuer.EnqueueTask(ctx, TestTaskName, TestTaskPayload{
Name: fmt.Sprintf("Test %d", i),
}); err != nil {
return err
}
if err := enqueuer.EnqueueTask(ctx, TestTaskName2, TestTaskPayload2{
Greeting: fmt.Sprintf("Greeter %d", i),
}); err != nil {
return err
}
}
}
time.Sleep(500 * time.Millisecond)
}
})

// Listen for signals to cancel the context.
// This will stop the routine and close the queue server.
eg.Go(func() error {
c := make(chan os.Signal, 1) // Create channel to signify a signal being sent
signal.Notify(c, os.Interrupt, syscall.SIGTERM) // When an interrupt or termination signal is sent, notify the channel

select {
case <-c:
log.Println("Shutting down...")
// Cancel the context
cancel()

return nil
case <-ctx.Done():
return nil
}
})

// Wait for the queue server to exit.
if err := eg.Wait(); err != nil {
Expand Down

0 comments on commit 1d45aa7

Please sign in to comment.