Skip to content

Commit

Permalink
Add examples
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrymomot committed Jan 19, 2024
1 parent d60cc6e commit ccf8e49
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 42 deletions.
145 changes: 103 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,67 +23,128 @@ go get github.com/dmitrymomot/asyncer

## Usage

In this example, we will create a simple task that prints a greeting message to the console.
Also, we will create a scheduler that will run the task every hour.
### Queued tasks

In this example, we will create a simple task that prints a greeting message to the console:

```go
package main

import (
"context"
"fmt"
"time"
"context"
"fmt"
"time"

"github.com/dmitrymomot/asyncer"
"github.com/dmitrymomot/asyncer"
"golang.org/x/sync/errgroup"
)

const redisAddr = "redis://localhost:6379/0"
const TestTaskName = "testTask1"
const (
redisAddr = "redis://localhost:6379/0"
TestTaskName = "queued_task"
)

type TestTaskPayload struct {
Name string
Name string
}

// test task handler function
func testTaskHandler(ctx context.Context, payload TestTaskPayload) error {
fmt.Printf("Hello, %s!\n", payload.Name)
return nil
fmt.Printf("Hello, %s!\n", payload.Name)
return nil
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eg, _ := errgroup.WithContext(ctx)

// Run a new queue server with redis as the broker.
eg.Go(asyncer.RunQueueServer(
ctx, redisAddr, nil,
// Register a handler for the task.
asyncer.HandlerFunc[TestTaskPayload](TestTaskName, testTaskHandler),
// ... add more handlers here ...
))

// Create a new enqueuer with redis as the broker.
enqueuer := asyncer.MustNewEnqueuer(redisAddr)
defer enqueuer.Close()

// Enqueue a task with payload.
// The task will be processed after immediately.
for i := 0; i < 10; i++ {
if err := enqueuer.EnqueueTask(ctx, TestTaskName, TestTaskPayload{
Name: fmt.Sprintf("Test %d", i),
}); err != nil {
panic(err)
}
time.Sleep(500 * time.Millisecond)
}

// Wait for the queue server to exit.
if err := eg.Wait(); err != nil {
panic(err)
}
}
```

### Scheduled tasks (Cron jobs)

Create a task that prints a greeting message to the console every 1 seconds:

```go
package main

import (
"context"
"fmt"
"time"

"github.com/dmitrymomot/asyncer"
"golang.org/x/sync/errgroup"
)

const (
redisAddr = "redis://localhost:6379/0"
TestTaskName = "scheduled_task"
)

type TestTaskPayload struct {
Name string
}

// test task handler function
func testTaskHandler(ctx context.Context) error {
fmt.Println("scheduled test task handler called at", time.Now().Format(time.RFC3339))
return nil
}

func main() {
// Create a new enqueuer with redis as the broker.
enqueuer := asyncer.MustNewEnqueuer(redisAddr)
defer enqueuer.Close()

// Enqueue a task with payload.
// The task will be processed after immediately.
if err := enqueuer.EnqueueTask(context.TODO(), TestTaskName, testTaskPayload{Name: "test"}); err != nil {
panic(err)
}

eg, _ := errgroup.WithContext(context.Background())

// Run a new queue server with redis as the broker.
eg.Go(asyncer.RunQueueServer(
redisAddr, nil,
// Register a handler for the task.
asyncer.HandlerFunc[TestTaskPayload](TestTaskName, testTaskHandler),
// ... add more handlers here ...
))

// Run a scheduler with redis as the broker.
// The scheduler will schedule tasks to be enqueued at a specified time.
eg, ctx := errgroup.WithContext(context.Background())

// Run a new queue server with redis as the broker.
eg.Go(asyncer.RunQueueServer(
ctx, redisAddr, nil,
// Register a handler for the task.
asyncer.ScheduledHandlerFunc(TestTaskName, testTaskHandler),
// ... add more handlers here ...
))

// Run a scheduler with redis as the broker.
// The scheduler will schedule tasks to be enqueued at a specified time.
eg.Go(asyncer.RunSchedulerServer(
redisAddr, nil,
// Schedule the testTask1 task to be enqueued every hour.
asyncer.ScheduledHandlerFunc[Payload]("@every 1h", TestTaskName),
// ... add more scheduled tasks here ...
ctx, redisAddr, nil,
// Schedule the scheduled_task task to be enqueued every 1 seconds.
asyncer.NewTaskScheduler("@every 1s", TestTaskName),
// ... add more scheduled tasks here ...
))
// Wait for the queue server to exit.
if err := eg.Wait(); err != nil {
panic(err)
}

// Wait for the queue server to exit.
if err := eg.Wait(); err != nil {
panic(err)
}
}
```

Expand Down
8 changes: 8 additions & 0 deletions example/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: "3"

services:
redis:
image: redis:alpine
container_name: asyncer-redis-example
ports:
- 6379:6379
60 changes: 60 additions & 0 deletions example/queue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"fmt"
"time"

"github.com/dmitrymomot/asyncer"
"golang.org/x/sync/errgroup"
)

const (
redisAddr = "redis://localhost:6379/0"
TestTaskName = "queued_task"
)

type TestTaskPayload struct {
Name string
}

// test task handler function
func testTaskHandler(ctx context.Context, payload TestTaskPayload) error {
fmt.Printf("Hello, %s!\n", payload.Name)
return nil
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eg, _ := errgroup.WithContext(ctx)

// Run a new queue server with redis as the broker.
eg.Go(asyncer.RunQueueServer(
ctx, redisAddr, nil,
// Register a handler for the task.
asyncer.HandlerFunc[TestTaskPayload](TestTaskName, testTaskHandler),
// ... add more handlers here ...
))

// Create a new enqueuer with redis as the broker.
enqueuer := asyncer.MustNewEnqueuer(redisAddr)
defer enqueuer.Close()

// Enqueue a task with payload.
// The task will be processed after immediately.
for i := 0; i < 10; i++ {
if err := enqueuer.EnqueueTask(ctx, TestTaskName, TestTaskPayload{
Name: fmt.Sprintf("Test %d", i),
}); err != nil {
panic(err)
}
time.Sleep(500 * time.Millisecond)
}

// Wait for the queue server to exit.
if err := eg.Wait(); err != nil {
panic(err)
}
}
51 changes: 51 additions & 0 deletions example/scheduler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"fmt"
"time"

"github.com/dmitrymomot/asyncer"
"golang.org/x/sync/errgroup"
)

const (
redisAddr = "redis://localhost:6379/0"
TestTaskName = "scheduled_task"
)

type TestTaskPayload struct {
Name string
}

// test task handler function
func testTaskHandler(ctx context.Context) error {
fmt.Println("scheduled test task handler called at", time.Now().Format(time.RFC3339))
return nil
}

func main() {
eg, ctx := errgroup.WithContext(context.Background())

// Run a new queue server with redis as the broker.
eg.Go(asyncer.RunQueueServer(
ctx, redisAddr, nil,
// Register a handler for the task.
asyncer.ScheduledHandlerFunc(TestTaskName, testTaskHandler),
// ... add more handlers here ...
))

// Run a scheduler with redis as the broker.
// The scheduler will schedule tasks to be enqueued at a specified time.
eg.Go(asyncer.RunSchedulerServer(
ctx, redisAddr, nil,
// Schedule the scheduled_task task to be enqueued every 1 seconds.
asyncer.NewTaskScheduler("@every 1s", TestTaskName),
// ... add more scheduled tasks here ...
))

// Wait for the queue server to exit.
if err := eg.Wait(); err != nil {
panic(err)
}
}

0 comments on commit ccf8e49

Please sign in to comment.