diff --git a/docs/json-schema/comment.schema.json b/docs/json-schema/comment.schema.json new file mode 100644 index 0000000..679507c --- /dev/null +++ b/docs/json-schema/comment.schema.json @@ -0,0 +1,33 @@ +{ + "$id": "https://sublinks.org/comment.schema.json", + "title": "Comment", + "description": "A comment in a Sublinks Post attributable to an author.", + "type": "object", + "properties": { + "id": { + "description": "The unique identifier for the comment.", + "type": "string" + }, + "post_id": { + "description": "The unique identifier of Post comment is for.", + "type": "string" + }, + "content": { + "description": "The content of the comment.", + "type": "string" + }, + "author_id": { + "description": "The unique identifier for the actor that is the author of the comment. I.E. discuss.online/u/lazyguru", + "type": "string" + }, + "published": { + "description": "The date and time the comment was published.", + "type": "string" + }, + "nsfw": { + "description": "Whether the comment has sensitive (NSFW) content or not.", + "type": "boolean" + } + }, + "required": ["id", "post", "content", "author_id", "published"] +} diff --git a/internal/activitypub/common.go b/internal/activitypub/common.go index 9bedf75..392e475 100644 --- a/internal/activitypub/common.go +++ b/internal/activitypub/common.go @@ -9,3 +9,8 @@ type Link struct { Type string `json:"type"` // "Link" | "Image" Href string `json:"href"` // "https://enterprise.lemmy.ml/pictrs/image/eOtYb9iEiB.png" } + +type Language struct { + Identifier string `json:"identifier"` // "fr", + Name string `json:"name"` // "Français" +} diff --git a/internal/activitypub/note.go b/internal/activitypub/note.go new file mode 100644 index 0000000..17598d5 --- /dev/null +++ b/internal/activitypub/note.go @@ -0,0 +1,60 @@ +package activitypub + +import ( + "fmt" + "sublinks/sublinks-federation/internal/model" + "time" +) + +type Note struct { + Context *Context `json:"@context,omitempty"` + Id string `json:"id"` + Type string `json:"type"` + AttributedTo string `json:"attributedTo"` + To []string `json:"to"` + Cc []string `json:"cc"` + Audience string `json:"audience"` + InReplyTo string `json:"inReplyTo"` + Content string `json:"content"` + MediaType string `json:"mediaType"` + Source Source `json:"source,omitempty"` + Tag []Tag `json:"tag,omitempty"` + Distinguished bool `json:"distinguished,omitempty"` + Language Language `json:"language,omitempty"` + Published time.Time `json:"published"` + Updated time.Time `json:"updated"` +} + +type Tag struct { + Href string `json:"href"` + Type string `json:"type"` + Name string `json:"name"` +} + +func NewNote(commentUrl string, fromUser string, postUrl string, commentBody string, published time.Time) *Note { + return &Note{ + Id: commentUrl, + Type: "Note", + AttributedTo: fromUser, + To: []string{"https://www.w3.org/ns/activitystreams#Public"}, + Cc: []string{fromUser, commentUrl}, + Audience: commentUrl, + InReplyTo: postUrl, + Content: commentBody, + MediaType: "text/html", + Source: Source{ + Content: fmt.Sprintf("This is a comment on %s post", postUrl), + MediaType: "text/markdown", + }, + Language: Language{ + Identifier: "en", + Name: "English", + }, + Distinguished: false, + Published: published, + } +} + +func ConvertCommentToNote(c *model.Comment) *Note { + return NewNote(c.UrlStub, c.Author, c.Post, c.Content, c.Published) +} diff --git a/internal/activitypub/post.go b/internal/activitypub/post.go index cc19ea5..848c309 100644 --- a/internal/activitypub/post.go +++ b/internal/activitypub/post.go @@ -6,11 +6,6 @@ import ( "time" ) -type Language struct { - Identifier string `json:"identifier"` // "fr", - Name string `json:"name"` // "Français" -} - type Page struct { Context *Context `json:"@context,omitempty"` Id string `json:"id"` diff --git a/internal/http/comment.go b/internal/http/comment.go new file mode 100644 index 0000000..a6e410d --- /dev/null +++ b/internal/http/comment.go @@ -0,0 +1,34 @@ +package http + +import ( + "encoding/json" + "fmt" + "net/http" + "sublinks/sublinks-federation/internal/activitypub" + "sublinks/sublinks-federation/internal/model" + + "github.com/gorilla/mux" +) + +func (server *Server) SetupCommentRoutes() { + server.Router.HandleFunc("/comment/{commentId}", server.getCommentHandler).Methods("GET") +} + +func (server *Server) getCommentHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + comment := model.Comment{UrlStub: vars["commentId"]} + err := server.Database.Find(&comment) + if err != nil { + server.Logger.Error(fmt.Sprintf("Error reading comment: %+v %s", comment, err), err) + return + } + commentLd := activitypub.ConvertCommentToNote(&comment) + commentLd.Context = activitypub.GetContext() + w.WriteHeader(http.StatusOK) + w.Header().Add("content-type", "application/activity+json") + content, _ := json.MarshalIndent(commentLd, "", " ") + _, err = w.Write(content) + if err != nil { + server.Logger.Error("Error writing response", err) + } +} diff --git a/internal/model/comment.go b/internal/model/comment.go new file mode 100644 index 0000000..6210caa --- /dev/null +++ b/internal/model/comment.go @@ -0,0 +1,13 @@ +package model + +import "time" + +type Comment struct { + Id string `json:"id" gorm:"primary_key"` + UrlStub string `json:"url_stub"` + Post string `json:"post_id"` + Author string `json:"author_id"` + Nsfw bool `json:"nsfw"` + Published time.Time `json:"published"` + Content string `json:"content"` +} diff --git a/internal/queue/consumer.go b/internal/queue/consumer.go index 9ce493e..1234abb 100644 --- a/internal/queue/consumer.go +++ b/internal/queue/consumer.go @@ -2,9 +2,17 @@ package queue import ( "errors" + "fmt" + "golang.org/x/sync/errgroup" "sublinks/sublinks-federation/internal/worker" ) +type ConsumerQueue struct { + Exchange string + QueueName string + RoutingKeys map[string]worker.Worker +} + func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error { channelRabbitMQ, err := q.Connection.Channel() if err != nil { @@ -14,14 +22,17 @@ func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error { if err != nil { return err } - err = channelRabbitMQ.QueueBind( - queueData.QueueName, // queue name - queueData.RoutingKey, // routing key - queueData.Exchange, // exchange - false, - nil) - if err != nil { - return err + + for routingKey, _ := range queueData.RoutingKeys { + err = channelRabbitMQ.QueueBind( + queueData.QueueName, // queue name + routingKey, // routing key + queueData.Exchange, // exchange + false, + nil) + if err != nil { + return err + } } // Subscribing to QueueService1 for getting messages. @@ -41,14 +52,7 @@ func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error { return nil } -type ConsumerQueue struct { - Exchange string - QueueName string - RoutingKey string -} - -// TODO: Implement a way to either pass a callback function or return messages/chan -func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue, worker worker.Worker) error { +func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue) error { err := q.createConsumer(queueData) if err != nil { return err @@ -57,21 +61,36 @@ func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue, worker worker.Worke if !ok { return errors.New("consumer not found") } - go func() { - for message := range messages { - err := worker.Process(message.Body) + + errGroup := new(errgroup.Group) + for message := range messages { + errGroup.Go(func() error { + cbWorker, ok := queueData.RoutingKeys[message.RoutingKey] + if !ok { + return errors.New(fmt.Sprintf("%s not implemented as valid routing key", message.RoutingKey)) + } + + err := cbWorker.Process(message.Body) + if err != nil { err = message.Acknowledger.Nack(message.DeliveryTag, false, true) if err != nil { - panic(err) // I know this isn't good. Will need to fix it + return errors.New(fmt.Sprintf("error nack'ing the message: %s", err.Error())) } - continue + return errors.New(fmt.Sprintf("error processing message body: %s", err.Error())) } + err = message.Acknowledger.Ack(message.DeliveryTag, false) if err != nil { - panic(err) // I know this isn't good. Will need to fix it + return errors.New(fmt.Sprintf("error ack'ing the message: %s", err.Error())) } - } - }() + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return err + } + return nil } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7a14eb0..c560b1b 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -2,20 +2,19 @@ package queue import ( "os" - - "sublinks/sublinks-federation/internal/db" - "sublinks/sublinks-federation/internal/log" "sublinks/sublinks-federation/internal/repository" "sublinks/sublinks-federation/internal/worker" amqp "github.com/rabbitmq/amqp091-go" + "sublinks/sublinks-federation/internal/db" + "sublinks/sublinks-federation/internal/log" ) type Queue interface { Connect() error Run(conn db.Database) PublishMessage(queueName string, message string) error - StartConsumer(queueData ConsumerQueue, worker worker.Worker) error + StartConsumer(queueData ConsumerQueue) error Status() map[string]map[string]bool Close() } @@ -52,42 +51,46 @@ func (q *RabbitQueue) Status() map[string]map[string]bool { func (q *RabbitQueue) Run(conn db.Database) { q.processActors(conn) - q.processPosts(conn) + q.processObjects(conn) } func (q *RabbitQueue) processActors(conn db.Database) { actorCQ := ConsumerQueue{ - QueueName: "actor_create_queue", - Exchange: "federation", - RoutingKey: "actor.create", + QueueName: "actor_create_queue", + Exchange: "federation", + RoutingKeys: map[string]worker.Worker{ + ActorRoutingKey: &worker.ActorWorker{ + Logger: q.logger, + Repository: repository.NewRepository(conn), + }, + }, } - aw := worker.ActorWorker{ - Logger: q.logger, - Repository: repository.NewRepository(conn), - } - - err := q.StartConsumer(actorCQ, &aw) + err := q.StartConsumer(actorCQ) if err != nil { q.logger.Fatal("failed starting actor consumer", err) } } -func (q *RabbitQueue) processPosts(conn db.Database) { - postCQ := ConsumerQueue{ - QueueName: "post_queue", - Exchange: "federation", - RoutingKey: "post.create", - } - - aw := worker.PostWorker{ - Logger: q.logger, - Repository: repository.NewRepository(conn), +func (q *RabbitQueue) processObjects(conn db.Database) { + queue := ConsumerQueue{ + QueueName: "object_create_queue", + Exchange: "federation", + RoutingKeys: map[string]worker.Worker{ + PostRoutingKey: &worker.PostWorker{ + Logger: q.logger, + Repository: repository.NewRepository(conn), + }, + CommentRoutingKey: &worker.CommentWorker{ + Logger: q.logger, + Repository: repository.NewRepository(conn), + }, + }, } - err := q.StartConsumer(postCQ, &aw) + err := q.StartConsumer(queue) if err != nil { - q.logger.Fatal("failed starting post consumer", err) + q.logger.Fatal("failed starting object consumer", err) } } diff --git a/internal/queue/routing-keys.go b/internal/queue/routing-keys.go new file mode 100644 index 0000000..77f0f3e --- /dev/null +++ b/internal/queue/routing-keys.go @@ -0,0 +1,7 @@ +package queue + +const ( + ActorRoutingKey = "actor.create" + PostRoutingKey = "post.create" + CommentRoutingKey = "comment.create" +) diff --git a/internal/worker/comment.go b/internal/worker/comment.go new file mode 100644 index 0000000..8529ab7 --- /dev/null +++ b/internal/worker/comment.go @@ -0,0 +1,32 @@ +package worker + +import ( + "encoding/json" + "os" + "strings" + "sublinks/sublinks-federation/internal/log" + "sublinks/sublinks-federation/internal/model" + "sublinks/sublinks-federation/internal/repository" +) + +type CommentWorker struct { + log.Logger + repository.Repository +} + +func (w *CommentWorker) Process(msg []byte) error { + comment := model.Comment{} + err := json.Unmarshal(msg, &comment) + hostname := os.Getenv("HOSTNAME") + comment.UrlStub = strings.Replace(hostname, comment.Id, "", 1) + if err != nil { + w.Logger.Error("Error unmarshalling comment: %s", err) + return err + } + err = w.Repository.Save(comment) + if err != nil { + w.Logger.Error("Error saving comment: %s", err) + return err + } + return nil +}