Skip to content

Commit

Permalink
Merge pull request #89 from lazyguru/refactor-to-service
Browse files Browse the repository at this point in the history
  • Loading branch information
lazyguru authored Jun 8, 2024
2 parents 1072ff2 + d8ba86f commit 129c8fc
Show file tree
Hide file tree
Showing 21 changed files with 266 additions and 118 deletions.
1 change: 1 addition & 0 deletions .env-sample
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# shellcheck disable=SC2034
#DB_TYPE="mysql"
#DB_DSN="federation:federation@tcp(127.0.0.1:3306)/federation?charset=utf8mb4&parseTime=True&loc=Local"
DB_TYPE="postgres"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This is the Federation service for the Sublinks project. It's built using Go.

Together with the [Sublinks Core](https://github.com/sublinks/sublinks) and [Sublinks Frontend](https://github.com/sublinks/sublinks-frontend) it's creating a federated link aggregation and microblogging platform.
Together with the [Sublinks Core](https://github.com/sublinks/sublinks-api) and [Sublinks Frontend](https://github.com/sublinks/sublinks-frontend) it's creating a federated link aggregation and microblogging platform.

## Contributing

Expand Down
15 changes: 11 additions & 4 deletions cmd/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sublinks/sublinks-federation/internal/http"
"sublinks/sublinks-federation/internal/log"
"sublinks/sublinks-federation/internal/queue"
"sublinks/sublinks-federation/internal/service"

"github.com/joho/godotenv"
)
Expand Down Expand Up @@ -33,11 +34,17 @@ func main() {
logger.Fatal("failed connecting to queue service", err)
}
defer q.Close()
q.Run(conn)
serviceManager := service.NewServiceManager(
service.NewUserService(conn),
service.NewCommunityService(conn),
service.NewPostService(conn),
service.NewCommentService(conn),
)
q.Run(serviceManager)
config := http.ServerConfig{
Logger: logger,
Database: conn,
Queue: q,
Logger: logger,
Queue: q,
ServiceManager: serviceManager,
}
s := http.NewServer(config)
s.RunServer()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rs/zerolog v1.33.0
golang.org/x/sync v0.5.0
golang.org/x/text v0.15.0
gorm.io/driver/mysql v1.5.6
gorm.io/driver/postgres v1.5.7
Expand All @@ -26,6 +27,5 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.21.0 // indirect
)
10 changes: 2 additions & 8 deletions internal/http/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"sublinks/sublinks-federation/internal/activitypub"
"sublinks/sublinks-federation/internal/model"

"golang.org/x/text/cases"
"golang.org/x/text/language"
Expand Down Expand Up @@ -53,11 +52,6 @@ func (server *Server) getActivityHandler(w http.ResponseWriter, r *http.Request)
}

func (server *Server) GetPostActivityObject(id string) (*activitypub.Page, error) {
post := model.Post{UrlStub: id}
err := server.Database.Find(&post)
if err != nil {
server.Logger.Error("Error reading post", err)
return nil, err
}
return activitypub.ConvertPostToPage(&post), nil
post := server.ServiceManager.PostService().GetById(id)
return activitypub.ConvertPostToPage(post), nil
}
14 changes: 6 additions & 8 deletions internal/http/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"sublinks/sublinks-federation/internal/activitypub"
"sublinks/sublinks-federation/internal/model"

"github.com/gorilla/mux"
)
Expand All @@ -17,18 +16,17 @@ func (server *Server) SetupCommunityRoutes() {
func (server *Server) getCommunityInfoHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
server.Logger.Info(fmt.Sprintf("Looking up community %s", vars["community"]))
community := model.Actor{Username: vars["community"], ActorType: "Group"}
err := server.Database.Find(&community)
if err != nil {
server.Logger.Error("Error reading community", err)
community := server.ServiceManager.CommunityService().GetById(vars["community"])
if community == nil {
server.Logger.Error("Community not found", nil)
w.WriteHeader(http.StatusNotFound)
return
}

communityLd := activitypub.ConvertActorToGroup(&community)
communityLd := activitypub.ConvertActorToGroup(community)
w.WriteHeader(http.StatusOK)
w.Header().Add("content-type", "application/activity+json")
content, _ := json.MarshalIndent(communityLd, "", " ")
_, err = w.Write(content)
_, err := w.Write(content)
if err != nil {
server.Logger.Error("Error writing response", err)
}
Expand Down
13 changes: 3 additions & 10 deletions internal/http/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package http

import (
"encoding/json"
"fmt"
"net/http"
"sublinks/sublinks-federation/internal/activitypub"
"sublinks/sublinks-federation/internal/model"

"github.com/gorilla/mux"
)
Expand All @@ -16,18 +14,13 @@ func (server *Server) SetupPostRoutes() {

func (server *Server) getPostHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
post := model.Post{UrlStub: vars["postId"]}
err := server.Database.Find(&post)
if err != nil {
server.Logger.Error(fmt.Sprintf("Error reading post: %+v %s", post, err), err)
return
}
postLd := activitypub.ConvertPostToPage(&post)
post := server.ServiceManager.PostService().GetById(vars["postId"])
postLd := activitypub.ConvertPostToPage(post)
postLd.Context = activitypub.GetContext()
w.WriteHeader(http.StatusOK)
w.Header().Add("content-type", "application/activity+json")
content, _ := json.MarshalIndent(postLd, "", " ")
_, err = w.Write(content)
_, err := w.Write(content)
if err != nil {
server.Logger.Error("Error writing response", err)
}
Expand Down
12 changes: 8 additions & 4 deletions internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sublinks/sublinks-federation/internal/db"
"sublinks/sublinks-federation/internal/log"
"sublinks/sublinks-federation/internal/queue"
"sublinks/sublinks-federation/internal/service"
"time"

"github.com/gorilla/mux"
Expand All @@ -19,22 +20,25 @@ type Server struct {
log.Logger
db.Database
queue.Queue
ServiceManager *service.ServiceManager
}

type ServerConfig struct {
log.Logger
db.Database
queue.Queue
ServiceManager *service.ServiceManager
}

func NewServer(config ServerConfig) *Server {
r := mux.NewRouter()

return &Server{
Router: r,
Logger: config.Logger,
Database: config.Database,
Queue: config.Queue,
Router: r,
Logger: config.Logger,
Database: config.Database,
Queue: config.Queue,
ServiceManager: config.ServiceManager,
}
}

Expand Down
13 changes: 6 additions & 7 deletions internal/http/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"sublinks/sublinks-federation/internal/activitypub"
"sublinks/sublinks-federation/internal/model"

"github.com/gorilla/mux"
)
Expand All @@ -17,18 +16,18 @@ func (server *Server) SetupUserRoutes() {
func (server *Server) getUserInfoHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
server.Logger.Info(fmt.Sprintf("Looking up user %s", vars["user"]))
user := model.Actor{Username: vars["user"], ActorType: "Person"}
err := server.Database.Find(&user)
if err != nil {
server.Logger.Error("Error reading user", err)
user := server.ServiceManager.UserService().GetById(vars["user"])
if user == nil {
server.Logger.Error("User not found", nil)
w.WriteHeader(http.StatusNotFound)
return
}

userLd := activitypub.ConvertActorToPerson(&user)
userLd := activitypub.ConvertActorToPerson(user)
w.WriteHeader(http.StatusOK)
w.Header().Add("content-type", "application/activity+json")
content, _ := json.MarshalIndent(userLd, "", " ")
_, err = w.Write(content)
_, err := w.Write(content)
if err != nil {
server.Logger.Error("Error writing response", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/model/actor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package model

type Actor struct {
ActorType string `json:"actor_type" gorm:"index"`
Id string `json:"id" gorm:"primarykey"`
Username string `json:"username"`
ActorType string `json:"actor_type" gorm:"primarykey"`
Id string `json:"actor_id" gorm:"primarykey"`
Username string `json:"display_name,omitempty" gorm:"not null"`
Name string `json:"name,omitempty" gorm:"nullable"`
Bio string `json:"bio"`
MatrixUserId string `json:"matrix_user_id,omitempty"`
Expand Down
16 changes: 8 additions & 8 deletions internal/queue/consumer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package queue

import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"sublinks/sublinks-federation/internal/worker"

"golang.org/x/sync/errgroup"
)

type ConsumerQueue struct {
Expand All @@ -23,7 +23,7 @@ func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error {
return err
}

for routingKey, _ := range queueData.RoutingKeys {
for routingKey := range queueData.RoutingKeys {
err = channelRabbitMQ.QueueBind(
queueData.QueueName, // queue name
routingKey, // routing key
Expand Down Expand Up @@ -59,30 +59,30 @@ func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue) error {
}
messages, ok := q.consumers[queueData.QueueName]
if !ok {
return errors.New("consumer not found")
return fmt.Errorf("consumer not found")
}

errGroup := new(errgroup.Group)
for message := range messages {
errGroup.Go(func() error {
cbWorker, ok := queueData.RoutingKeys[message.RoutingKey]
if !ok {
return errors.New(fmt.Sprintf("%s not implemented as valid routing key", message.RoutingKey))
return fmt.Errorf("%s not implemented as valid routing key", message.RoutingKey)
}

err := cbWorker.Process(message.Body)

if err != nil {
err = message.Acknowledger.Nack(message.DeliveryTag, false, true)
if err != nil {
return errors.New(fmt.Sprintf("error nack'ing the message: %s", err.Error()))
return fmt.Errorf("error nack'ing the message: %s", err.Error())
}
return errors.New(fmt.Sprintf("error processing message body: %s", err.Error()))
return fmt.Errorf("error processing message body: %s", err.Error())
}

err = message.Acknowledger.Ack(message.DeliveryTag, false)
if err != nil {
return errors.New(fmt.Sprintf("error ack'ing the message: %s", err.Error()))
return fmt.Errorf("error ack'ing the message: %s", err.Error())
}
return nil
})
Expand Down
42 changes: 21 additions & 21 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package queue

import (
"os"
"sublinks/sublinks-federation/internal/repository"
"sublinks/sublinks-federation/internal/log"
"sublinks/sublinks-federation/internal/service"
"sublinks/sublinks-federation/internal/worker"

amqp "github.com/rabbitmq/amqp091-go"
"sublinks/sublinks-federation/internal/db"
"sublinks/sublinks-federation/internal/log"
)

type Queue interface {
Connect() error
Run(conn db.Database)
Run(serviceManager *service.ServiceManager)
PublishMessage(queueName string, message string) error
StartConsumer(queueData ConsumerQueue) error
Status() map[string]map[string]bool
Expand Down Expand Up @@ -49,20 +48,21 @@ func (q *RabbitQueue) Status() map[string]map[string]bool {
return status
}

func (q *RabbitQueue) Run(conn db.Database) {
q.processActors(conn)
q.processObjects(conn)
func (q *RabbitQueue) Run(serviceManager *service.ServiceManager) {
q.processActors(serviceManager)
q.processObjects(serviceManager)
}

func (q *RabbitQueue) processActors(conn db.Database) {
func (q *RabbitQueue) processActors(serviceManager *service.ServiceManager) {
actorCQ := ConsumerQueue{
QueueName: "actor_create_queue",
Exchange: "federation",
RoutingKeys: map[string]worker.Worker{
ActorRoutingKey: &worker.ActorWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
},
ActorRoutingKey: worker.NewActorWorker(
q.logger,
serviceManager.UserService(),
serviceManager.CommunityService(),
),
},
}

Expand All @@ -72,19 +72,19 @@ func (q *RabbitQueue) processActors(conn db.Database) {
}
}

func (q *RabbitQueue) processObjects(conn db.Database) {
func (q *RabbitQueue) processObjects(serviceManager *service.ServiceManager) {
queue := ConsumerQueue{
QueueName: "object_create_queue",
Exchange: "federation",
RoutingKeys: map[string]worker.Worker{
PostRoutingKey: &worker.PostWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
},
CommentRoutingKey: &worker.CommentWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
},
PostRoutingKey: worker.NewPostWorker(
q.logger,
serviceManager.PostService(),
),
CommentRoutingKey: worker.NewCommentWorker(
q.logger,
serviceManager.CommentService(),
),
},
}

Expand Down
26 changes: 0 additions & 26 deletions internal/repository/repository.go

This file was deleted.

Loading

0 comments on commit 129c8fc

Please sign in to comment.