Skip to content

Commit

Permalink
Setup daily notifier and adapt worker to handle multiple scheduled jo…
Browse files Browse the repository at this point in the history
…bs as abstraction (#6)
  • Loading branch information
Flexicon authored Nov 26, 2023
1 parent 8422bae commit e2f0589
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 12 deletions.
8 changes: 6 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ database:

github:
user: flexicon
token: ""
token: ''

telegram:
bot_token: ''
chat_id: ''

fetch_worker:
schedule: "*/10 * * * *"
schedule: '*/10 * * * *'
37 changes: 37 additions & 0 deletions daily_notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"fmt"
"log"

"github.com/pkg/errors"
"gorm.io/gorm"
)

const (
DailyNotificationTemplate = `📢 *LateNightCommits Daily Notifier* 📊
Fetched a total of %d commits today\.
ℹ️ Check out the [stats page](https://latenightcommits.com/api/stats) for more\.`
)

func runDailyNotification(db *gorm.DB, notifier Notifier) error {
log.Println("📬 Running Daily Notification job...")

var sentToday int64
if err := db.Raw(
`SELECT COUNT(id) FROM commits WHERE created_at BETWEEN CURRENT_DATE AND (CURRENT_DATE + INTERVAL 1 DAY);`,
).Scan(&sentToday).Error; err != nil {
return errors.Wrap(err, "failed to retrieve daily fetched amount for notifier")
}

msg := fmt.Sprintf(DailyNotificationTemplate, sentToday)

if err := notifier.Notify(msg); err != nil {
return errors.Wrap(err, "failed to notify")
}

log.Println("Successfully sent daily notification!")
return nil
}
31 changes: 22 additions & 9 deletions fetch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/spf13/viper"
"gorm.io/gorm"
)

func runFetchWorker(db *gorm.DB, api *GitHubAPI) error {
func runFetchWorker(db *gorm.DB, api *GitHubAPI, notifier Notifier) error {
log.Println("Running fetch_worker")
start := time.Now()

Expand All @@ -26,14 +24,29 @@ func runFetchWorker(db *gorm.DB, api *GitHubAPI) error {
}()

// Run fetch job on a schedule
scheduler := cron.New()
scheduler := NewScheduler()
jobs := []*JobDefinition{
{
Name: "daily_notifier",
Schedule: viper.GetString("daily_notifier.schedule"),
Run: func() error {
return runDailyNotification(db, notifier)
},
},
{
Name: "fetch_commits",
Schedule: viper.GetString("fetch_worker.schedule"),
Run: func() error {
return runFetchJob(db, api)
},
},
}

if _, err := scheduler.AddFunc(viper.GetString("fetch_worker.schedule"), func() {
if err := runFetchJob(db, api); err != nil {
log.Printf("Fetch job error: %v", err)
for _, job := range jobs {
log.Printf(`Scheduling %s worker to run on "%s"`, job.Name, job.Schedule)
if err := scheduler.AddJob(job); err != nil {
return err
}
}); err != nil {
return errors.Wrap(err, "failed to schedule fetch job worker")
}

scheduler.Run()
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ func run() error {
}

api := NewGitHubAPI()
notifier := NewNotifier()

switch *mode {
case "fetch":
return runFetchJob(db, api)
case "fetch_worker":
return runFetchWorker(db, api)
return runFetchWorker(db, api, notifier)
case "send_daily_notification":
return runDailyNotification(db, notifier)
case "web":
return runWebApp(db)
default:
Expand Down Expand Up @@ -79,6 +82,7 @@ func ViperInit() error {
viper.SetDefault("port", 80)
viper.SetDefault("github.search_page_depth", 5)
viper.SetDefault("fetch_worker.schedule", "*/10 * * * *")
viper.SetDefault("daily_notifier.schedule", "55 23 * * *")

if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
Expand Down
35 changes: 35 additions & 0 deletions notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"log"

"github.com/spf13/viper"
)

// Notifier encapsulates the logic and infrastructure necessary to send
// notifications to external systems.
type Notifier interface {
Notify(msg string) error
}

// NoopNotifier simply performs a no-op instead of notifying any external system.
type NoopNotifier struct{}

// Notify returns nil immediately.
func (n *NoopNotifier) Notify(msg string) error {
return nil
}

// NewNotifier builds a notifier based on the current configuration, or a NoopNotifier
// if none configured.
func NewNotifier() Notifier {
token := viper.GetString("telegram.bot_token")
chatID := viper.GetString("telegram.chat_id")

if token == "" || chatID == "" {
log.Println("🟨 No notifier configured, using NoopNotifier.")
return &NoopNotifier{}
}

return NewTelegramNotifier(token, chatID)
}
43 changes: 43 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"fmt"
"log"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"
)

// Scheduler handles running jobs on a given cron Schedule.
type Scheduler struct {
cron *cron.Cron
}

// NewScheduler returns a new Scheduler instance.
func NewScheduler() *Scheduler {
return &Scheduler{cron.New()}
}

// Run the cron scheduler, or no-op if already running.
func (s *Scheduler) Run() {
s.cron.Run()
}

// JobDefinition for working with the Scheduler.
type JobDefinition struct {
Name string
Schedule string
Run func() error
}

// AddJob adds a job to the Scheduler to be run on the given spec schedule and name.
func (s *Scheduler) AddJob(job *JobDefinition) error {
if _, err := s.cron.AddFunc(job.Schedule, func() {
if err := job.Run(); err != nil {
log.Printf("%s job error: %v", job.Name, err)
}
}); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to schedule %s worker", job.Name))
}
return nil
}
52 changes: 52 additions & 0 deletions telegram_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)

type TelegramNotifier struct {
token string
chatID string
http *http.Client
}

func NewTelegramNotifier(token, chatID string) *TelegramNotifier {
return &TelegramNotifier{
token: token,
chatID: chatID,
http: &http.Client{
Timeout: 5 * time.Second,
},
}
}

func (t *TelegramNotifier) Notify(msg string) error {
notifyURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", t.token)
form := url.Values{}
form.Set("text", msg)
form.Set("chat_id", t.chatID)
form.Set("parse_mode", "MarkdownV2")

req, err := http.NewRequest(http.MethodPost, notifyURL, strings.NewReader(form.Encode()))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

res, err := t.http.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != 200 {
body, _ := io.ReadAll(res.Body)
return fmt.Errorf("bad response %d from telegram: %s", res.StatusCode, string(body))
}
return nil
}

0 comments on commit e2f0589

Please sign in to comment.