diff --git a/.env-sample b/.env-sample index 1e2bf90..7f26d1e 100644 --- a/.env-sample +++ b/.env-sample @@ -1,3 +1,4 @@ +# shellcheck disable=SC2034 #DB_TYPE="mysql" #DB_DSN="federation:federation@tcp(127.0.0.1:3306)/federation?charset=utf8mb4&parseTime=True&loc=Local" DB_TYPE="postgres" diff --git a/README.md b/README.md index 75a8a7f..e1b9da1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This is the Federation service for the Sublinks project. It's built using Go. -Together with the [Sublinks Core](https://github.com/sublinks/sublinks) and [Sublinks Frontend](https://github.com/sublinks/sublinks-frontend) it's creating a federated link aggregation and microblogging platform. +Together with the [Sublinks Core](https://github.com/sublinks/sublinks-api) and [Sublinks Frontend](https://github.com/sublinks/sublinks-frontend) it's creating a federated link aggregation and microblogging platform. ## Contributing diff --git a/cmd/federation.go b/cmd/federation.go index 4246595..dfc2d21 100644 --- a/cmd/federation.go +++ b/cmd/federation.go @@ -6,6 +6,7 @@ import ( "sublinks/sublinks-federation/internal/http" "sublinks/sublinks-federation/internal/log" "sublinks/sublinks-federation/internal/queue" + "sublinks/sublinks-federation/internal/service" "github.com/joho/godotenv" ) @@ -33,11 +34,17 @@ func main() { logger.Fatal("failed connecting to queue service", err) } defer q.Close() - q.Run(conn) + serviceManager := service.NewServiceManager( + service.NewUserService(conn), + service.NewCommunityService(conn), + service.NewPostService(conn), + service.NewCommentService(conn), + ) + q.Run(serviceManager) config := http.ServerConfig{ - Logger: logger, - Database: conn, - Queue: q, + Logger: logger, + Queue: q, + ServiceManager: serviceManager, } s := http.NewServer(config) s.RunServer() diff --git a/go.mod b/go.mod index f67c5c7..a1784e5 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/rabbitmq/amqp091-go v1.10.0 github.com/rs/zerolog v1.33.0 + golang.org/x/sync v0.5.0 golang.org/x/text v0.15.0 gorm.io/driver/mysql v1.5.6 gorm.io/driver/postgres v1.5.7 @@ -26,6 +27,5 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/crypto v0.20.0 // indirect - golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.21.0 // indirect ) diff --git a/internal/http/activity.go b/internal/http/activity.go index 07a90b4..b79e36c 100644 --- a/internal/http/activity.go +++ b/internal/http/activity.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "sublinks/sublinks-federation/internal/activitypub" - "sublinks/sublinks-federation/internal/model" "golang.org/x/text/cases" "golang.org/x/text/language" @@ -53,11 +52,6 @@ func (server *Server) getActivityHandler(w http.ResponseWriter, r *http.Request) } func (server *Server) GetPostActivityObject(id string) (*activitypub.Page, error) { - post := model.Post{UrlStub: id} - err := server.Database.Find(&post) - if err != nil { - server.Logger.Error("Error reading post", err) - return nil, err - } - return activitypub.ConvertPostToPage(&post), nil + post := server.ServiceManager.PostService().GetById(id) + return activitypub.ConvertPostToPage(post), nil } diff --git a/internal/http/community.go b/internal/http/community.go index aa9740f..37358dd 100644 --- a/internal/http/community.go +++ b/internal/http/community.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "sublinks/sublinks-federation/internal/activitypub" - "sublinks/sublinks-federation/internal/model" "github.com/gorilla/mux" ) @@ -17,18 +16,17 @@ func (server *Server) SetupCommunityRoutes() { func (server *Server) getCommunityInfoHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) server.Logger.Info(fmt.Sprintf("Looking up community %s", vars["community"])) - community := model.Actor{Username: vars["community"], ActorType: "Group"} - err := server.Database.Find(&community) - if err != nil { - server.Logger.Error("Error reading community", err) + community := server.ServiceManager.CommunityService().GetById(vars["community"]) + if community == nil { + server.Logger.Error("Community not found", nil) + w.WriteHeader(http.StatusNotFound) return } - - communityLd := activitypub.ConvertActorToGroup(&community) + communityLd := activitypub.ConvertActorToGroup(community) w.WriteHeader(http.StatusOK) w.Header().Add("content-type", "application/activity+json") content, _ := json.MarshalIndent(communityLd, "", " ") - _, err = w.Write(content) + _, err := w.Write(content) if err != nil { server.Logger.Error("Error writing response", err) } diff --git a/internal/http/post.go b/internal/http/post.go index 1fb913b..573264a 100644 --- a/internal/http/post.go +++ b/internal/http/post.go @@ -2,10 +2,8 @@ package http import ( "encoding/json" - "fmt" "net/http" "sublinks/sublinks-federation/internal/activitypub" - "sublinks/sublinks-federation/internal/model" "github.com/gorilla/mux" ) @@ -16,18 +14,13 @@ func (server *Server) SetupPostRoutes() { func (server *Server) getPostHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - post := model.Post{UrlStub: vars["postId"]} - err := server.Database.Find(&post) - if err != nil { - server.Logger.Error(fmt.Sprintf("Error reading post: %+v %s", post, err), err) - return - } - postLd := activitypub.ConvertPostToPage(&post) + post := server.ServiceManager.PostService().GetById(vars["postId"]) + postLd := activitypub.ConvertPostToPage(post) postLd.Context = activitypub.GetContext() w.WriteHeader(http.StatusOK) w.Header().Add("content-type", "application/activity+json") content, _ := json.MarshalIndent(postLd, "", " ") - _, err = w.Write(content) + _, err := w.Write(content) if err != nil { server.Logger.Error("Error writing response", err) } diff --git a/internal/http/server.go b/internal/http/server.go index e51f90e..e2d3be9 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -9,6 +9,7 @@ import ( "sublinks/sublinks-federation/internal/db" "sublinks/sublinks-federation/internal/log" "sublinks/sublinks-federation/internal/queue" + "sublinks/sublinks-federation/internal/service" "time" "github.com/gorilla/mux" @@ -19,22 +20,25 @@ type Server struct { log.Logger db.Database queue.Queue + ServiceManager *service.ServiceManager } type ServerConfig struct { log.Logger db.Database queue.Queue + ServiceManager *service.ServiceManager } func NewServer(config ServerConfig) *Server { r := mux.NewRouter() return &Server{ - Router: r, - Logger: config.Logger, - Database: config.Database, - Queue: config.Queue, + Router: r, + Logger: config.Logger, + Database: config.Database, + Queue: config.Queue, + ServiceManager: config.ServiceManager, } } diff --git a/internal/http/user.go b/internal/http/user.go index 566c321..67c7e8f 100644 --- a/internal/http/user.go +++ b/internal/http/user.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "sublinks/sublinks-federation/internal/activitypub" - "sublinks/sublinks-federation/internal/model" "github.com/gorilla/mux" ) @@ -17,18 +16,18 @@ func (server *Server) SetupUserRoutes() { func (server *Server) getUserInfoHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) server.Logger.Info(fmt.Sprintf("Looking up user %s", vars["user"])) - user := model.Actor{Username: vars["user"], ActorType: "Person"} - err := server.Database.Find(&user) - if err != nil { - server.Logger.Error("Error reading user", err) + user := server.ServiceManager.UserService().GetById(vars["user"]) + if user == nil { + server.Logger.Error("User not found", nil) + w.WriteHeader(http.StatusNotFound) return } - userLd := activitypub.ConvertActorToPerson(&user) + userLd := activitypub.ConvertActorToPerson(user) w.WriteHeader(http.StatusOK) w.Header().Add("content-type", "application/activity+json") content, _ := json.MarshalIndent(userLd, "", " ") - _, err = w.Write(content) + _, err := w.Write(content) if err != nil { server.Logger.Error("Error writing response", err) } diff --git a/internal/model/actor.go b/internal/model/actor.go index a5591f2..bff4ad5 100644 --- a/internal/model/actor.go +++ b/internal/model/actor.go @@ -1,9 +1,9 @@ package model type Actor struct { - ActorType string `json:"actor_type" gorm:"index"` - Id string `json:"id" gorm:"primarykey"` - Username string `json:"username"` + ActorType string `json:"actor_type" gorm:"primarykey"` + Id string `json:"actor_id" gorm:"primarykey"` + Username string `json:"display_name,omitempty" gorm:"not null"` Name string `json:"name,omitempty" gorm:"nullable"` Bio string `json:"bio"` MatrixUserId string `json:"matrix_user_id,omitempty"` diff --git a/internal/queue/consumer.go b/internal/queue/consumer.go index 1234abb..2c1ea5b 100644 --- a/internal/queue/consumer.go +++ b/internal/queue/consumer.go @@ -1,10 +1,10 @@ package queue import ( - "errors" "fmt" - "golang.org/x/sync/errgroup" "sublinks/sublinks-federation/internal/worker" + + "golang.org/x/sync/errgroup" ) type ConsumerQueue struct { @@ -23,7 +23,7 @@ func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error { return err } - for routingKey, _ := range queueData.RoutingKeys { + for routingKey := range queueData.RoutingKeys { err = channelRabbitMQ.QueueBind( queueData.QueueName, // queue name routingKey, // routing key @@ -59,7 +59,7 @@ func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue) error { } messages, ok := q.consumers[queueData.QueueName] if !ok { - return errors.New("consumer not found") + return fmt.Errorf("consumer not found") } errGroup := new(errgroup.Group) @@ -67,7 +67,7 @@ func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue) error { errGroup.Go(func() error { cbWorker, ok := queueData.RoutingKeys[message.RoutingKey] if !ok { - return errors.New(fmt.Sprintf("%s not implemented as valid routing key", message.RoutingKey)) + return fmt.Errorf("%s not implemented as valid routing key", message.RoutingKey) } err := cbWorker.Process(message.Body) @@ -75,14 +75,14 @@ func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue) error { if err != nil { err = message.Acknowledger.Nack(message.DeliveryTag, false, true) if err != nil { - return errors.New(fmt.Sprintf("error nack'ing the message: %s", err.Error())) + return fmt.Errorf("error nack'ing the message: %s", err.Error()) } - return errors.New(fmt.Sprintf("error processing message body: %s", err.Error())) + return fmt.Errorf("error processing message body: %s", err.Error()) } err = message.Acknowledger.Ack(message.DeliveryTag, false) if err != nil { - return errors.New(fmt.Sprintf("error ack'ing the message: %s", err.Error())) + return fmt.Errorf("error ack'ing the message: %s", err.Error()) } return nil }) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index c560b1b..37ca7ff 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -2,17 +2,16 @@ package queue import ( "os" - "sublinks/sublinks-federation/internal/repository" + "sublinks/sublinks-federation/internal/log" + "sublinks/sublinks-federation/internal/service" "sublinks/sublinks-federation/internal/worker" amqp "github.com/rabbitmq/amqp091-go" - "sublinks/sublinks-federation/internal/db" - "sublinks/sublinks-federation/internal/log" ) type Queue interface { Connect() error - Run(conn db.Database) + Run(serviceManager *service.ServiceManager) PublishMessage(queueName string, message string) error StartConsumer(queueData ConsumerQueue) error Status() map[string]map[string]bool @@ -49,20 +48,21 @@ func (q *RabbitQueue) Status() map[string]map[string]bool { return status } -func (q *RabbitQueue) Run(conn db.Database) { - q.processActors(conn) - q.processObjects(conn) +func (q *RabbitQueue) Run(serviceManager *service.ServiceManager) { + q.processActors(serviceManager) + q.processObjects(serviceManager) } -func (q *RabbitQueue) processActors(conn db.Database) { +func (q *RabbitQueue) processActors(serviceManager *service.ServiceManager) { actorCQ := ConsumerQueue{ QueueName: "actor_create_queue", Exchange: "federation", RoutingKeys: map[string]worker.Worker{ - ActorRoutingKey: &worker.ActorWorker{ - Logger: q.logger, - Repository: repository.NewRepository(conn), - }, + ActorRoutingKey: worker.NewActorWorker( + q.logger, + serviceManager.UserService(), + serviceManager.CommunityService(), + ), }, } @@ -72,19 +72,19 @@ func (q *RabbitQueue) processActors(conn db.Database) { } } -func (q *RabbitQueue) processObjects(conn db.Database) { +func (q *RabbitQueue) processObjects(serviceManager *service.ServiceManager) { queue := ConsumerQueue{ QueueName: "object_create_queue", Exchange: "federation", RoutingKeys: map[string]worker.Worker{ - PostRoutingKey: &worker.PostWorker{ - Logger: q.logger, - Repository: repository.NewRepository(conn), - }, - CommentRoutingKey: &worker.CommentWorker{ - Logger: q.logger, - Repository: repository.NewRepository(conn), - }, + PostRoutingKey: worker.NewPostWorker( + q.logger, + serviceManager.PostService(), + ), + CommentRoutingKey: worker.NewCommentWorker( + q.logger, + serviceManager.CommentService(), + ), }, } diff --git a/internal/repository/repository.go b/internal/repository/repository.go deleted file mode 100644 index 9d966bc..0000000 --- a/internal/repository/repository.go +++ /dev/null @@ -1,26 +0,0 @@ -package repository - -import ( - "sublinks/sublinks-federation/internal/db" -) - -type Repository interface { - Save(interface{}) error - Find(*interface{}, ...interface{}) error -} - -type RepositoryImpl struct { - db db.Database -} - -func NewRepository(db db.Database) Repository { - return &RepositoryImpl{db: db} -} - -func (repository *RepositoryImpl) Save(a interface{}) error { - return repository.db.Save(a) -} - -func (repository *RepositoryImpl) Find(a *interface{}, params ...interface{}) error { - return repository.db.Find(a, params...) -} diff --git a/internal/service/comments.go b/internal/service/comments.go new file mode 100644 index 0000000..e95e05b --- /dev/null +++ b/internal/service/comments.go @@ -0,0 +1,28 @@ +package service + +import ( + "sublinks/sublinks-federation/internal/db" + "sublinks/sublinks-federation/internal/model" +) + +type CommentService struct { + db db.Database +} + +func NewCommentService(db db.Database) *CommentService { + return &CommentService{db} +} + +func (p CommentService) GetById(id string) interface{} { + comment := &model.Comment{} + err := p.db.Find(comment, id) + if err != nil { + return comment + } + return nil +} + +func (p CommentService) Save(comment interface{}) bool { + err := p.db.Save(comment) + return err == nil +} diff --git a/internal/service/community.go b/internal/service/community.go new file mode 100644 index 0000000..592bb14 --- /dev/null +++ b/internal/service/community.go @@ -0,0 +1,29 @@ +package service + +import ( + "sublinks/sublinks-federation/internal/db" + "sublinks/sublinks-federation/internal/model" +) + +type CommunityService struct { + db db.Database +} + +func NewCommunityService(db db.Database) *CommunityService { + return &CommunityService{db} +} + +func (a CommunityService) GetById(id string) *model.Actor { + actor := &model.Actor{ActorType: "Group"} + a.load(actor, id) + return actor +} + +func (a CommunityService) load(actor *model.Actor, id string) { + _ = a.db.Find(actor, id) +} + +func (a CommunityService) Save(actor *model.Actor) bool { + err := a.db.Save(actor) + return err == nil +} diff --git a/internal/service/posts.go b/internal/service/posts.go new file mode 100644 index 0000000..b6c77f3 --- /dev/null +++ b/internal/service/posts.go @@ -0,0 +1,28 @@ +package service + +import ( + "sublinks/sublinks-federation/internal/db" + "sublinks/sublinks-federation/internal/model" +) + +type PostService struct { + db db.Database +} + +func NewPostService(db db.Database) *PostService { + return &PostService{db} +} + +func (p PostService) GetById(id string) *model.Post { + post := &model.Post{} + err := p.db.Find(post, id) + if err != nil { + return post + } + return nil +} + +func (p PostService) Save(post *model.Post) bool { + err := p.db.Save(post) + return err == nil +} diff --git a/internal/service/service.go b/internal/service/service.go new file mode 100644 index 0000000..deff114 --- /dev/null +++ b/internal/service/service.go @@ -0,0 +1,33 @@ +package service + +type ServiceManager struct { + userService *UserService + communityService *CommunityService + postService *PostService + commentService *CommentService +} + +func NewServiceManager(userService *UserService, communityService *CommunityService, postService *PostService, commentService *CommentService) *ServiceManager { + return &ServiceManager{ + userService: userService, + communityService: communityService, + postService: postService, + commentService: commentService, + } +} + +func (sm *ServiceManager) CommunityService() *CommunityService { + return sm.communityService +} + +func (sm *ServiceManager) PostService() *PostService { + return sm.postService +} + +func (sm *ServiceManager) UserService() *UserService { + return sm.userService +} + +func (sm *ServiceManager) CommentService() *CommentService { + return sm.commentService +} diff --git a/internal/service/user.go b/internal/service/user.go new file mode 100644 index 0000000..71db0a1 --- /dev/null +++ b/internal/service/user.go @@ -0,0 +1,32 @@ +package service + +import ( + "sublinks/sublinks-federation/internal/db" + "sublinks/sublinks-federation/internal/model" +) + +type UserService struct { + db db.Database +} + +func NewUserService(db db.Database) *UserService { + return &UserService{db} +} + +func (a UserService) GetById(id string) *model.Actor { + actor := model.Actor{ActorType: "Person"} + return a.Load(&actor, id) +} + +func (a UserService) Load(actor *model.Actor, id string) *model.Actor { + err := a.db.Find(actor, id) + if err != nil { + return actor + } + return nil +} + +func (a UserService) Save(actor *model.Actor) bool { + err := a.db.Save(actor) + return err == nil +} diff --git a/internal/worker/actor.go b/internal/worker/actor.go index be3bf79..20dafb2 100644 --- a/internal/worker/actor.go +++ b/internal/worker/actor.go @@ -2,14 +2,24 @@ package worker import ( "encoding/json" + "errors" "sublinks/sublinks-federation/internal/log" "sublinks/sublinks-federation/internal/model" - "sublinks/sublinks-federation/internal/repository" + "sublinks/sublinks-federation/internal/service" ) type ActorWorker struct { log.Logger - repository.Repository + userService *service.UserService + communityService *service.CommunityService +} + +func NewActorWorker(logger log.Logger, userService *service.UserService, communityService *service.CommunityService) *ActorWorker { + return &ActorWorker{ + Logger: logger, + userService: userService, + communityService: communityService, + } } func (w *ActorWorker) Process(msg []byte) error { @@ -19,10 +29,14 @@ func (w *ActorWorker) Process(msg []byte) error { w.Logger.Error("Error unmarshalling actor", err) return err } - err = w.Repository.Save(&actor) - if err != nil { - w.Logger.Error("Error saving actor", err) - return err + if actor.ActorType == "Group" && !w.communityService.Save(&actor) { + w.Logger.Error("Error saving actor (community)", nil) + return errors.New("Error saving actor (community)") + } + + if actor.ActorType == "Person" && !w.userService.Save(&actor) { + w.Logger.Error("Error saving actor (user)", nil) + return errors.New("Error saving actor (user)") } return nil } diff --git a/internal/worker/comment.go b/internal/worker/comment.go index 8529ab7..37f5d59 100644 --- a/internal/worker/comment.go +++ b/internal/worker/comment.go @@ -2,16 +2,24 @@ package worker import ( "encoding/json" + "errors" "os" "strings" "sublinks/sublinks-federation/internal/log" "sublinks/sublinks-federation/internal/model" - "sublinks/sublinks-federation/internal/repository" + "sublinks/sublinks-federation/internal/service" ) type CommentWorker struct { log.Logger - repository.Repository + service *service.CommentService +} + +func NewCommentWorker(logger log.Logger, service *service.CommentService) *CommentWorker { + return &CommentWorker{ + Logger: logger, + service: service, + } } func (w *CommentWorker) Process(msg []byte) error { @@ -23,10 +31,9 @@ func (w *CommentWorker) Process(msg []byte) error { w.Logger.Error("Error unmarshalling comment: %s", err) return err } - err = w.Repository.Save(comment) - if err != nil { - w.Logger.Error("Error saving comment: %s", err) - return err + if !w.service.Save(&comment) { + w.Logger.Error("Error saving comment", nil) + return errors.New("Error saving comment") } return nil } diff --git a/internal/worker/post.go b/internal/worker/post.go index 8998405..e82c753 100644 --- a/internal/worker/post.go +++ b/internal/worker/post.go @@ -6,12 +6,19 @@ import ( "strings" "sublinks/sublinks-federation/internal/log" "sublinks/sublinks-federation/internal/model" - "sublinks/sublinks-federation/internal/repository" + "sublinks/sublinks-federation/internal/service" ) type PostWorker struct { log.Logger - repository.Repository + service *service.PostService +} + +func NewPostWorker(logger log.Logger, service *service.PostService) *PostWorker { + return &PostWorker{ + Logger: logger, + service: service, + } } func (w *PostWorker) Process(msg []byte) error { @@ -23,9 +30,9 @@ func (w *PostWorker) Process(msg []byte) error { w.Logger.Error("Error unmarshalling post: %s", err) return err } - err = w.Repository.Save(post) - if err != nil { - w.Logger.Error("Error saving post: %s", err) + res := w.service.Save(&post) + if !res { + w.Logger.Error("Error saving post", nil) return err } return nil