diff --git a/cmd/main.go b/cmd/main.go
index 9f96e2d0..97a93a69 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -221,21 +221,49 @@ func runCoreService(ctx context.Context, config *config.File, databaseClient dat
return nil
}
+// findModuleByID find and returns the specified worker ID in all components.
+func findModuleByID(configFile *config.File, workerID string) (*config.Module, error) {
+ // find the module in a specific component list
+ findInComponent := func(components []*config.Module) (*config.Module, bool) {
+ for _, module := range components {
+ if strings.EqualFold(module.ID, workerID) {
+ return module, true
+ }
+ }
+
+ return nil, false
+ }
+
+ // Search in decentralized components
+ if module, found := findInComponent(configFile.Component.Decentralized); found {
+ return module, nil
+ }
+
+ // Search in federated components
+ if module, found := findInComponent(configFile.Component.Federated); found {
+ return module, nil
+ }
+
+ if module, found := findInComponent([]*config.Module{configFile.Component.RSS}); found {
+ return module, nil
+ }
+
+ return nil, fmt.Errorf("undefined module %s", workerID)
+}
+
func runWorker(ctx context.Context, configFile *config.File, databaseClient database.Client, streamClient stream.Client, redisClient rueidis.Client) error {
workerID, err := flags.GetString(flag.KeyWorkerID)
if err != nil {
return fmt.Errorf("invalid worker id: %w", err)
}
- module, found := lo.Find(configFile.Component.Decentralized, func(module *config.Module) bool {
- return strings.EqualFold(module.ID, workerID)
- })
-
- if !found {
- return fmt.Errorf("undefined module %s", workerID)
+ module, err := findModuleByID(configFile, workerID)
+ if err != nil {
+ return fmt.Errorf("find module by id: %w", err)
}
server, err := indexer.NewServer(ctx, module, databaseClient, streamClient, redisClient)
+
if err != nil {
return fmt.Errorf("new indexer server: %w", err)
}
diff --git a/config/config_test.go b/config/config_test.go
index 47f1c098..ff32feb7 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -35,6 +35,10 @@ endpoints:
url: https://rpc.ankr.com/eth
http_headers:
user-agent: rss3-node
+ mastodon:
+ url: https://0.0.0.0:9092/
+ http_headers:
+ user-agent: rss3-node
database:
driver: postgres
partition: true
@@ -60,15 +64,21 @@ observability:
endpoint: localhost:4318
component:
rss:
- network: rss
- worker: rsshub
- endpoint: https://rsshub.app/
- parameters:
- authentication:
- username: user
- password: pass
- access_key: abc
- access_code: def
+ network: rss
+ worker: rsshub
+ endpoint: https://rsshub.app/
+ parameters:
+ authentication:
+ username: user
+ password: pass
+ access_key: abc
+ access_code: def
+ federated:
+ network: mastodon
+ worker: mastodon
+ endpoint: mastodon
+ parameters:
+ mastodon_kafka_topic: activitypub_events
decentralized:
- network: ethereum
worker: core
@@ -91,6 +101,12 @@ component:
"http_headers": {
"user-agent": "rss3-node"
}
+ },
+ "mastodon": {
+ "url": "https://0.0.0.0:9092/",
+ "http_headers": {
+ "user-agent": "rss3-node"
+ }
}
},
"discovery": {
@@ -136,41 +152,50 @@ component:
}
},
"component": {
- "rss":
- {
- "network": "rss",
- "worker": "rsshub",
- "endpoint": "https://rsshub.app/",
- "parameters": {
- "authentication": {
- "username": "user",
- "password": "pass",
- "access_key": "abc",
- "access_code": "def"
- }
- }
- },
- "decentralized": [
- {
- "network": "ethereum",
- "worker": "core",
- "endpoint": "ethereum",
- "parameters": {
- "block_start": 47370106,
- "block_target": 456
- }
- },
- {
- "network": "ethereum",
- "worker": "rss3",
- "endpoint": "https://rpc.ankr.com/eth",
- "parameters": {
- "block_start": 123,
- "concurrent_block_requests": 2
- }
+ "rss": {
+ "network": "rss",
+ "worker": "rsshub",
+ "endpoint": "https://rsshub.app/",
+ "parameters": {
+ "authentication": {
+ "username": "user",
+ "password": "pass",
+ "access_key": "abc",
+ "access_code": "def"
+ }
+ }
+ },
+ "federated": [
+ {
+ "network": "mastodon",
+ "worker": "mastodon",
+ "endpoint": "mastodon",
+ "parameters": {
+ "mastodon_kafka_topic": "activitypub_events"
}
- ]
- }
+ }
+ ],
+ "decentralized": [
+ {
+ "network": "ethereum",
+ "worker": "core",
+ "endpoint": "ethereum",
+ "parameters": {
+ "block_start": 47370106,
+ "block_target": 456
+ }
+ },
+ {
+ "network": "ethereum",
+ "worker": "rss3",
+ "endpoint": "https://rpc.ankr.com/eth",
+ "parameters": {
+ "block_start": 123,
+ "concurrent_block_requests": 2
+ }
+ }
+ ]
+}
}`
configExampleToml = `environment = "development"
@@ -184,6 +209,12 @@ url = "https://rpc.ankr.com/eth"
[endpoints.ethereum.http_headers]
user-agent = "rss3-node"
+[endpoints.mastodon]
+url = "https://0.0.0.0:9092/"
+
+ [endpoints.mastodon.http_headers]
+ user-agent = "rss3-node"
+
[discovery.server]
endpoint = "https://node.mydomain.com/"
global_indexer_endpoint = "https://gi.rss3.dev/"
@@ -227,6 +258,14 @@ password = "pass"
access_key = "abc"
access_code = "def"
+[[component.federated]]
+network = "mastodon"
+worker = "mastodon"
+endpoint = "mastodon"
+
+[component.federated.parameters]
+mastodon_kafka_topic = "activitypub_events"
+
[[component.decentralized]]
network = "ethereum"
worker = "core"
@@ -257,6 +296,12 @@ var configFileExpected = &File{
"user-agent": "rss3-node",
},
},
+ "mastodon": {
+ URL: "https://0.0.0.0:9092/",
+ HTTPHeaders: map[string]string{
+ "user-agent": "rss3-node",
+ },
+ },
},
Discovery: &Discovery{
Operator: &Operator{
@@ -286,7 +331,22 @@ var configFileExpected = &File{
},
},
},
- Federated: nil,
+ Federated: []*Module{
+ {
+ Network: network.Mastodon,
+ Worker: decentralized.Mastodon,
+ EndpointID: "mastodon",
+ Endpoint: Endpoint{
+ URL: "https://0.0.0.0:9092/",
+ HTTPHeaders: map[string]string{
+ "user-agent": "rss3-node",
+ },
+ },
+ Parameters: &Parameters{
+ "mastodon_kafka_topic": "activitypub_events",
+ },
+ },
+ },
Decentralized: []*Module{
{
Network: network.Ethereum,
@@ -540,6 +600,26 @@ func AssertConfig(t *testing.T, expect, got *File) {
assert.Equal(t, expect.Discovery, got.Discovery)
})
+ t.Run("federated", func(t *testing.T) {
+ for i, federated := range expect.Component.Federated {
+ func(_expect, got *Module) {
+ t.Run(fmt.Sprintf("federated-%d", i), func(t *testing.T) {
+ t.Parallel()
+ assert.Equal(t, _expect, got)
+ })
+ }(federated, got.Component.Federated[i])
+ }
+
+ for i, indexer := range got.Component.Federated {
+ func(_except, got *Module) {
+ t.Run(fmt.Sprintf("%s-%s", indexer.Network, indexer.Worker), func(t *testing.T) {
+ t.Parallel()
+ AssertIndexer(t, _except, got)
+ })
+ }(configFileExpected.Component.Federated[i], indexer)
+ }
+ })
+
t.Run("decentralized", func(t *testing.T) {
func(_expect, got *Module) {
t.Run("rss", func(t *testing.T) {
diff --git a/docs/openapi.json b/docs/openapi.json
index cb734d3b..8e41abc0 100644
--- a/docs/openapi.json
+++ b/docs/openapi.json
@@ -1290,7 +1290,7 @@
"description": "The platform on which activities occur.",
"type": "string",
"example": "Uniswap",
- "enum": ["1inch","AAVE","Aavegotchi","Arbitrum","Base","BendDAO","Cow","Crossbell","Curve","ENS","Farcaster","Highlight","IQWiki","KiwiStand","Lens","LiNEAR","Lido","Linear","LooksRare","Matters","Mirror","Nouns","OpenSea","Optimism","Paragraph","Paraswap","RSS3","SAVM","Stargate","Uniswap","Unknown","VSL"]
+ "enum": ["1inch","AAVE","Aavegotchi","Arbitrum","Base","BendDAO","Cow","Crossbell","Curve","ENS","Farcaster","Highlight","IQWiki","KiwiStand","Lens","LiNEAR","Lido","Linear","LooksRare","Mastodon","Matters","Mirror","Nouns","OpenSea","Optimism","Paragraph","Paraswap","Polymarket","RSS3","SAVM","Stargate","Uniswap","Unknown","VSL"]
},
"Timestamp": {
"description": "The timestamp of when the activity occurred.",
diff --git a/internal/database/client.go b/internal/database/client.go
index 8756821e..1e5a2f0e 100644
--- a/internal/database/client.go
+++ b/internal/database/client.go
@@ -21,6 +21,7 @@ type Client interface {
DatasetMirrorPost
DatasetFarcasterProfile
DatasetENSNamehash
+ DatasetMastodonHandle
LoadCheckpoint(ctx context.Context, id string, network network.Network, worker string) (*engine.Checkpoint, error)
LoadCheckpoints(ctx context.Context, id string, network network.Network, worker string) ([]*engine.Checkpoint, error)
@@ -58,6 +59,11 @@ type DatasetENSNamehash interface {
SaveDatasetENSNamehash(ctx context.Context, namehash *model.ENSNamehash) error
}
+type DatasetMastodonHandle interface {
+ LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error)
+ SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error
+}
+
var _ goose.Logger = (*SugaredLogger)(nil)
type SugaredLogger struct {
diff --git a/internal/database/dialer/postgres/client.go b/internal/database/dialer/postgres/client.go
index 0e8bda43..1716ae26 100644
--- a/internal/database/dialer/postgres/client.go
+++ b/internal/database/dialer/postgres/client.go
@@ -275,7 +275,7 @@ func (c *client) SaveDatasetMirrorPost(ctx context.Context, post *mirror_model.D
return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
}
-// FindActivity finds a Activity by id.
+// FindActivity finds an Activity by id.
func (c *client) FindActivity(ctx context.Context, query model.ActivityQuery) (*activityx.Activity, *int, error) {
if c.partition {
return c.findActivityPartitioned(ctx, query)
@@ -372,6 +372,47 @@ func (c *client) SaveDatasetENSNamehash(ctx context.Context, namehash *model.ENS
return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
}
+// LoadDatasetMastodonHandle loads a Mastodon handle.
+func (c *client) LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error) {
+ var value table.DatasetMastodonHandle
+
+ if err := c.database.WithContext(ctx).
+ Where("handle = ?", handle).
+ First(&value).
+ Error; err != nil {
+ if !errors.Is(err, gorm.ErrRecordNotFound) {
+ return nil, err
+ }
+
+ // Initialize a default handle.
+ value = table.DatasetMastodonHandle{
+ Handle: handle,
+ LastUpdated: time.Now(),
+ }
+ }
+
+ return value.Export()
+}
+
+// SaveDatasetMastodonHandle saves a Mastodon handle.
+func (c *client) SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error {
+ clauses := []clause.Expression{
+ clause.OnConflict{
+ Columns: []clause.Column{{Name: "handle"}},
+ DoUpdates: clause.Assignments(map[string]interface{}{
+ "last_updated": time.Now(),
+ }),
+ },
+ }
+
+ var value table.DatasetMastodonHandle
+ if err := value.Import(handle); err != nil {
+ return err
+ }
+
+ return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
+}
+
// Dial dials a database.
func Dial(ctx context.Context, dataSourceName string, partition bool) (database.Client, error) {
var err error
diff --git a/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql b/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql
new file mode 100644
index 00000000..b5e988ea
--- /dev/null
+++ b/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql
@@ -0,0 +1,12 @@
+-- +goose Up
+CREATE TABLE IF NOT EXISTS dataset_mastodon_handles (
+ handle VARCHAR(255) PRIMARY KEY,
+ last_updated TIMESTAMP NOT NULL
+);
+
+CREATE INDEX idx_mastodon_handles_last_updated ON dataset_mastodon_handles(last_updated);
+
+-- +goose Down
+-- +goose StatementBegin
+DROP TABLE IF EXISTS dataset_mastodon_handles;
+-- +goose StatementEnd
\ No newline at end of file
diff --git a/internal/database/dialer/postgres/table/dataset_mastodon_handle.go b/internal/database/dialer/postgres/table/dataset_mastodon_handle.go
new file mode 100644
index 00000000..72e97c78
--- /dev/null
+++ b/internal/database/dialer/postgres/table/dataset_mastodon_handle.go
@@ -0,0 +1,30 @@
+package table
+
+import (
+ "time"
+
+ "github.com/rss3-network/node/internal/database/model"
+)
+
+var _ model.MastodonHandleTransformer = (*DatasetMastodonHandle)(nil)
+
+type DatasetMastodonHandle struct {
+ Handle string `gorm:"column:handle;primaryKey"`
+ LastUpdated time.Time `gorm:"column:last_updated;not null"`
+}
+
+func (d *DatasetMastodonHandle) Import(handle *model.MastodonHandle) error {
+ d.Handle = handle.Handle
+ d.LastUpdated = handle.LastUpdated
+
+ return nil
+}
+
+func (d *DatasetMastodonHandle) Export() (*model.MastodonHandle, error) {
+ handle := model.MastodonHandle{
+ Handle: d.Handle,
+ LastUpdated: d.LastUpdated,
+ }
+
+ return &handle, nil
+}
diff --git a/internal/database/model/dataset_mastodon_handle.go b/internal/database/model/dataset_mastodon_handle.go
new file mode 100644
index 00000000..159e78af
--- /dev/null
+++ b/internal/database/model/dataset_mastodon_handle.go
@@ -0,0 +1,13 @@
+package model
+
+import "time"
+
+type MastodonHandleTransformer interface {
+ Import(handle *MastodonHandle) error
+ Export() (*MastodonHandle, error)
+}
+
+type MastodonHandle struct {
+ Handle string `json:"handle"`
+ LastUpdated time.Time `json:"last_updated"`
+}
diff --git a/internal/engine/source/activitypub/data_source.go b/internal/engine/source/activitypub/data_source.go
new file mode 100644
index 00000000..0174cea2
--- /dev/null
+++ b/internal/engine/source/activitypub/data_source.go
@@ -0,0 +1,241 @@
+package activitypub
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "runtime"
+ "time"
+
+ "github.com/avast/retry-go/v4"
+ "github.com/rss3-network/node/config"
+ "github.com/rss3-network/node/internal/database"
+ "github.com/rss3-network/node/internal/engine"
+ "github.com/rss3-network/node/provider/activitypub"
+ "github.com/rss3-network/node/provider/activitypub/mastodon"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/samber/lo"
+ "github.com/sourcegraph/conc/pool"
+ "github.com/twmb/franz-go/pkg/kgo"
+ "go.uber.org/zap"
+)
+
+var _ engine.DataSource = (*dataSource)(nil)
+
+var DefaultStartTime int64
+
+// dataSource struct defines the fields for the data source
+type dataSource struct {
+ config *config.Module
+ databaseClient database.Client
+ mastodonClient mastodon.Client
+ option *Option
+ state State
+}
+
+// Network returns the network configuration from the config
+func (s *dataSource) Network() network.Network {
+ return s.config.Network
+}
+
+func (s *dataSource) State() json.RawMessage {
+ return lo.Must(json.Marshal(s.state))
+}
+
+// Start initializes the data source and starts consuming Kafka messages
+func (s *dataSource) Start(ctx context.Context, tasksChan chan<- *engine.Tasks, errorChan chan<- error) {
+ if err := s.initialize(); err != nil {
+ errorChan <- fmt.Errorf("initialize dataSource: %w", err)
+ return
+ }
+
+ // Start consuming Kafka messages in a goroutine
+ go func() {
+ if err := retryOperation(ctx, func(ctx context.Context) error {
+ return s.consumeKafkaMessages(ctx, tasksChan)
+ }); err != nil {
+ errorChan <- err
+ }
+ }()
+}
+
+// consumeKafkaMessages consumes messages from a Kafka topic and processes them
+func (s *dataSource) consumeKafkaMessages(ctx context.Context, tasksChan chan<- *engine.Tasks) error {
+ consumer := s.mastodonClient.GetKafkaConsumer()
+
+ // Create a pool to process messages concurrently
+ resultPool := pool.NewWithResults[*engine.Tasks]().WithMaxGoroutines(runtime.NumCPU())
+
+ // Loop to continuously process incoming messages
+ for {
+ // Start polling messages from the specified Kafka partition
+ fetches := consumer.PollFetches(ctx)
+ if errs := fetches.Errors(); len(errs) > 0 {
+ for _, e := range errs {
+ zap.L().Error("consumer poll fetch error", zap.Error(e.Err))
+ }
+
+ return fmt.Errorf("consumer poll fetch error: %v", errs)
+ }
+
+ fetches.EachRecord(func(record *kgo.Record) {
+ // Process each message in a separate goroutine for concurrent processing
+ resultPool.Go(func() *engine.Tasks {
+ // Process each message
+ tasks := s.processMessage(ctx, record)
+ if tasks != nil {
+ tasksChan <- tasks
+ }
+
+ return tasks
+ })
+ })
+
+ select {
+ // Check if the context is done
+ case <-ctx.Done():
+ resultPool.Wait()
+ return ctx.Err()
+ default:
+ }
+ }
+}
+
+// processMessage processes a single Kafka message
+func (s *dataSource) processMessage(ctx context.Context, record *kgo.Record) *engine.Tasks {
+ // Update the state with the last processed message count number (offset)
+ s.state.LastOffset = record.Offset
+
+ zap.L().Info("[activitypub/data_source.go] Consumed message",
+ zap.Int64("offset", record.Offset),
+ zap.String("value", string(record.Value)))
+
+ // Unmarshal the message and store it as an ActivityPub object
+ messageValue := string(record.Value)
+
+ var object activitypub.Object
+
+ if err := json.Unmarshal([]byte(messageValue), &object); err != nil {
+ zap.L().Error("failed to unmarshal message value", zap.Error(err))
+ return nil
+ }
+
+ // zap.L().Info("[activitypub/data_source.go] Unmarshal object",
+ // zap.Any("object", object))
+
+ // Build the corresponding message task for transformation
+ tasks := s.buildMastodonMessageTasks(ctx, object)
+
+ // Print the tasks for debugging
+ zap.L().Info("[activitypub/data_source.go] Generated tasks")
+
+ for _, task := range tasks.Tasks {
+ zap.L().Info("Task", zap.Any("task", task))
+ }
+
+ return tasks
+}
+
+// initialize sets up the Kafka consumer and Mastodon client
+func (s *dataSource) initialize() (err error) {
+ // Get the kafka topic parameter from the config.yaml file
+ kafkaTopic := s.option.KafkaTopic
+
+ // Create a new Client instance with required kafka parameters
+ client, err := mastodon.NewClient(s.config.Endpoint.URL, kafkaTopic)
+ if err != nil {
+ return fmt.Errorf("create activitypub client: %w", err)
+ }
+
+ s.mastodonClient = client
+
+ return nil
+}
+
+// retryOperation retries an operation with specified delay and backoff
+func retryOperation(ctx context.Context, operation func(ctx context.Context) error) error {
+ return retry.Do(
+ func() error {
+ return operation(ctx)
+ },
+ retry.Attempts(0),
+ retry.Delay(1*time.Second),
+ retry.DelayType(retry.BackOffDelay),
+ retry.OnRetry(func(n uint, err error) {
+ zap.L().Warn("retry farcaster dataSource", zap.Uint("retry", n), zap.Error(err))
+ }),
+ )
+}
+
+// buildMastodonMessageTasks processes the Kafka message and creates tasks for the engine
+func (s *dataSource) buildMastodonMessageTasks(_ context.Context, object activitypub.Object) *engine.Tasks {
+ var tasks engine.Tasks
+
+ // If the object is empty, return an empty task
+ if object.Type == "" {
+ return &tasks
+ }
+
+ // Filter the message based on type
+ // switch object.Type {
+ // case "Create":
+ // if note, ok := object.Object.(map[string]interface{}); ok {
+ // if noteType, exists := note["type"].(string); exists && noteType == "Note" {
+ // // Example: Create task for a Note object
+ // zap.L().Info("[buildMastodonMessageTasks] Object in Create type (for a Note Object)")
+ // }
+ // }
+ // case "Follow":
+ // // Process Follow type messages
+ // zap.L().Info("[buildMastodonMessageTasks] Object in Follow type")
+ // case "Like":
+ // // Process Like type messages
+ // zap.L().Info("[buildMastodonMessageTasks] Object in Like type")
+ // default:
+ // zap.L().Debug("unsupported message type", zap.String("type", object.Type))
+ // }
+
+ tasks.Tasks = append(tasks.Tasks, &Task{
+ Network: s.Network(),
+ Message: object,
+ })
+
+ return &tasks
+}
+
+// NewSource creates a new data source instance
+func NewSource(config *config.Module, checkpoint *engine.Checkpoint, databaseClient database.Client) (engine.DataSource, error) {
+ var (
+ state State
+
+ err error
+ )
+
+ // Initialize state from checkpoint.
+ if checkpoint != nil {
+ if err := json.Unmarshal(checkpoint.State, &state); err != nil {
+ return nil, err
+ }
+ }
+
+ // Create a new datasource instance
+ instance := dataSource{
+ databaseClient: databaseClient,
+ config: config,
+ state: state,
+ }
+
+ if instance.option, err = NewOption(config.Network, config.Parameters); err != nil {
+ return nil, fmt.Errorf("parse config: %w", err)
+ }
+
+ DefaultStartTime = instance.option.TimestampStart
+
+ zap.L().Info("apply option", zap.Any("option", instance.option))
+
+ return &instance, nil
+}
+
+func GetTimestampStart() int64 {
+ return DefaultStartTime
+}
diff --git a/internal/engine/source/activitypub/option.go b/internal/engine/source/activitypub/option.go
new file mode 100644
index 00000000..60c62c60
--- /dev/null
+++ b/internal/engine/source/activitypub/option.go
@@ -0,0 +1,38 @@
+package activitypub
+
+import (
+ "fmt"
+
+ "github.com/rss3-network/node/config"
+ "github.com/rss3-network/node/config/parameter"
+ "github.com/rss3-network/protocol-go/schema/network"
+)
+
+// Option represents the configuration options for the ActivityPub client.
+type Option struct {
+ KafkaTopic string `json:"mastodon_kafka_topic"`
+ TimestampStart int64 `json:"timestamp_start" mapstructure:"timestamp_start"`
+}
+
+// NewOption creates a new Option instance from the provided parameters.
+func NewOption(n network.Network, parameters *config.Parameters) (*Option, error) {
+ var option Option
+
+ fmt.Println("parameters is:", parameters)
+
+ if parameters == nil {
+ return &Option{
+ TimestampStart: parameter.CurrentNetworkStartBlock[n].Timestamp,
+ }, nil
+ }
+
+ if err := parameters.Decode(&option); err != nil {
+ return nil, err
+ }
+
+ if option.TimestampStart == 0 {
+ option.TimestampStart = parameter.CurrentNetworkStartBlock[n].Timestamp
+ }
+
+ return &option, nil
+}
diff --git a/internal/engine/source/activitypub/state.go b/internal/engine/source/activitypub/state.go
new file mode 100644
index 00000000..b6ac97e2
--- /dev/null
+++ b/internal/engine/source/activitypub/state.go
@@ -0,0 +1,6 @@
+package activitypub
+
+// State represents the state of the ActivityPub data source.
+type State struct {
+ LastOffset int64 `json:"last_offset"`
+}
diff --git a/internal/engine/source/activitypub/task.go b/internal/engine/source/activitypub/task.go
new file mode 100644
index 00000000..1970a4ce
--- /dev/null
+++ b/internal/engine/source/activitypub/task.go
@@ -0,0 +1,73 @@
+package activitypub
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/rss3-network/node/internal/engine"
+ "github.com/rss3-network/node/provider/activitypub"
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/typex"
+ "go.uber.org/zap"
+)
+
+var _ engine.Task = (*Task)(nil)
+
+type Task struct {
+ Network network.Network
+ Message activitypub.Object
+}
+
+func (t Task) ID() string {
+ return t.Message.ID
+}
+
+func (t Task) GetNetwork() network.Network {
+ return t.Network
+}
+
+func (t Task) GetTimestamp() uint64 {
+ // Get the defaultStartTime that was pulled from VSL NetworkParams
+ defaultStartTime := GetTimestampStart()
+
+ // Use default time if Published is empty
+ timeStr := t.Message.Published
+
+ if timeStr == "" {
+ return uint64(defaultStartTime)
+ }
+
+ parsedTime, err := time.Parse(time.RFC3339, timeStr)
+ if err != nil {
+ zap.L().Info("Error parsinig time")
+ return 0
+ }
+
+ // Convert the time.Time object to a Unix timestamp and cast to uint64
+ return uint64(parsedTime.Unix())
+}
+
+func (t Task) Validate() error {
+ return nil
+}
+
+func (t Task) BuildActivity(options ...activityx.Option) (*activityx.Activity, error) {
+ activity := activityx.Activity{
+ ID: t.ID(),
+ Network: t.Network,
+ Type: typex.Unknown,
+ Status: true,
+ Actions: make([]*activityx.Action, 0),
+ Timestamp: t.GetTimestamp(),
+ }
+
+ // Apply activity options.
+ for _, option := range options {
+ if err := option(&activity); err != nil {
+ return nil, fmt.Errorf("apply option: %w", err)
+ }
+ }
+
+ return &activity, nil
+}
diff --git a/internal/engine/source/factory.go b/internal/engine/source/factory.go
index 19c747f2..d67df282 100644
--- a/internal/engine/source/factory.go
+++ b/internal/engine/source/factory.go
@@ -7,6 +7,7 @@ import (
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
+ "github.com/rss3-network/node/internal/engine/source/activitypub"
"github.com/rss3-network/node/internal/engine/source/arweave"
"github.com/rss3-network/node/internal/engine/source/ethereum"
"github.com/rss3-network/node/internal/engine/source/farcaster"
@@ -23,6 +24,8 @@ func New(config *config.Module, sourceFilter engine.DataSourceFilter, checkpoint
return arweave.NewSource(config, sourceFilter, checkpoint, redisClient)
case network.FarcasterSource:
return farcaster.NewSource(config, checkpoint, databaseClient)
+ case network.ActivityPubSource:
+ return activitypub.NewSource(config, checkpoint, databaseClient)
case network.NearSource:
return near.NewSource(config, sourceFilter, checkpoint, redisClient)
default:
diff --git a/internal/engine/worker/decentralized/contract/nouns/worker.go b/internal/engine/worker/decentralized/contract/nouns/worker.go
index 6721255d..5f04c525 100644
--- a/internal/engine/worker/decentralized/contract/nouns/worker.go
+++ b/internal/engine/worker/decentralized/contract/nouns/worker.go
@@ -104,9 +104,6 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
err error
)
- // fmt.Println("log.Topics is: ", log.Topics[0].String())
- // fmt.Println("log.Address is: ", log.Address.String())
-
switch {
case w.matchNounsAuctionBid(log):
actions, err = w.handleNounsAuctionBid(ctx, ethereumTask, log)
diff --git a/internal/engine/worker/decentralized/core/worker.go b/internal/engine/worker/decentralized/core/worker.go
index 2c4e5829..c70aedb9 100644
--- a/internal/engine/worker/decentralized/core/worker.go
+++ b/internal/engine/worker/decentralized/core/worker.go
@@ -5,17 +5,21 @@ import (
"github.com/redis/rueidis"
"github.com/rss3-network/node/config"
+ "github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/arweave"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/ethereum"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/farcaster"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/near"
+ "github.com/rss3-network/node/internal/engine/worker/federated/activitypub/mastodon"
"github.com/rss3-network/protocol-go/schema/network"
)
// NewWorker creates a new core worker.
-func NewWorker(config *config.Module, redisClient rueidis.Client) (engine.Worker, error) {
+func NewWorker(config *config.Module, databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
switch config.Network.Source() {
+ case network.ActivityPubSource:
+ return mastodon.NewWorker(databaseClient, redisClient)
case network.EthereumSource:
return ethereum.NewWorker(config, redisClient)
case network.ArweaveSource:
diff --git a/internal/engine/worker/decentralized/factory.go b/internal/engine/worker/decentralized/factory.go
index 373f7185..d6ccabcc 100644
--- a/internal/engine/worker/decentralized/factory.go
+++ b/internal/engine/worker/decentralized/factory.go
@@ -45,7 +45,7 @@ import (
func New(config *config.Module, databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
if config.Worker == decentralized.Core {
- return core.NewWorker(config, redisClient)
+ return core.NewWorker(config, databaseClient, redisClient)
}
return newNonCoreWorker(config, databaseClient, redisClient)
diff --git a/internal/engine/worker/federated/activitypub/mastodon/worker.go b/internal/engine/worker/federated/activitypub/mastodon/worker.go
new file mode 100644
index 00000000..2bcc7c23
--- /dev/null
+++ b/internal/engine/worker/federated/activitypub/mastodon/worker.go
@@ -0,0 +1,474 @@
+package mastodon
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/redis/rueidis"
+ "github.com/rss3-network/node/internal/database"
+ "github.com/rss3-network/node/internal/database/model"
+ "github.com/rss3-network/node/internal/engine"
+ source "github.com/rss3-network/node/internal/engine/source/activitypub"
+ "github.com/rss3-network/node/provider/activitypub"
+ "github.com/rss3-network/node/provider/activitypub/mastodon"
+ "github.com/rss3-network/node/provider/httpx"
+ "github.com/rss3-network/node/provider/redis"
+ "github.com/rss3-network/node/schema/worker/decentralized"
+ "github.com/rss3-network/protocol-go/schema"
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/metadata"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/tag"
+ "github.com/rss3-network/protocol-go/schema/typex"
+ "go.uber.org/zap"
+)
+
+var _ engine.Worker = (*worker)(nil)
+
+type worker struct {
+ httpClient httpx.Client
+ databaseClient database.Client
+ redisClient rueidis.Client
+}
+
+func (w *worker) Name() string {
+ return decentralized.Mastodon.String()
+}
+
+func (w *worker) Platform() string {
+ return decentralized.PlatformMastodon.String()
+}
+
+func (w *worker) Network() []network.Network {
+ return []network.Network{
+ network.Mastodon,
+ }
+}
+
+func (w *worker) Tags() []tag.Tag {
+ return []tag.Tag{
+ tag.Social,
+ }
+}
+
+func (w *worker) Types() []schema.Type {
+ return []schema.Type{
+ typex.SocialComment,
+ typex.SocialPost,
+ typex.SocialShare,
+ }
+}
+
+// Filter returns a source filter.
+func (w *worker) Filter() engine.DataSourceFilter {
+ return nil
+}
+
+// Transform processes the task and converts it into an activity.
+func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Activity, error) {
+ zap.L().Info("[mastodon/worker.go] reached Transform()")
+
+ activityPubTask, ok := task.(*source.Task)
+
+ if !ok {
+ return nil, fmt.Errorf("invalid task type: %T", task)
+ }
+
+ activity, err := task.BuildActivity(activityx.WithActivityPlatform(w.Platform()))
+
+ if err != nil {
+ return nil, fmt.Errorf("build activity: %w", err)
+ }
+
+ // Handle ActivityPub message.
+ switch activityPubTask.Message.Type {
+ case mastodon.MessageTypeCreate.String():
+ err := w.handleActivityPubCreate(ctx, activityPubTask.Message, activity)
+ if err != nil {
+ return nil, fmt.Errorf("error occurred in handleActivityPubCreate: %w", err)
+ }
+ case mastodon.MessageTypeAnnounce.String():
+ err := w.handleActivityPubAnnounce(ctx, activityPubTask.Message, activity)
+ if err != nil {
+ return nil, fmt.Errorf("error occurred in handleActivityPubAnnounce: %w", err)
+ }
+ case mastodon.MessageTypeLike.String():
+ err := w.handleActivityPubLike(ctx, activityPubTask.Message, activity)
+ if err != nil {
+ return nil, fmt.Errorf("error occurred in handleActivityPubLike: %w", err)
+ }
+ default:
+ zap.L().Info("unsupported type", zap.String("type", activityPubTask.Message.Type))
+ return nil, nil
+ }
+
+ return activity, nil
+}
+
+// handleActivityPubCreate handles mastodon post message.
+func (w *worker) handleActivityPubCreate(ctx context.Context, message activitypub.Object, activity *activityx.Activity) error {
+ noteObject, ok := message.Object.(map[string]interface{})
+ if !ok || noteObject[mastodon.Type] != mastodon.ObjectTypeNote {
+ zap.L().Info("unsupported object type for Create", zap.String("type", fmt.Sprintf("%T", message.Object)))
+ return fmt.Errorf("invalid object type for Create activity")
+ }
+
+ // Convert the map to an ActivityPub Note struct
+ var note activitypub.Note
+ if err := mapToStruct(noteObject, ¬e); err != nil {
+ return fmt.Errorf("failed to convert note object: %w", err)
+ }
+
+ timestamp := activity.Timestamp
+ // Build the Activity SocialPost object from the Note
+ post := w.buildPost(ctx, message, note, timestamp)
+ activity.Type = typex.SocialPost
+
+ currentUserHandle := convertURLToHandle(message.Actor)
+
+ // If the actor is from a relay server then we directly set it as the handle
+ if currentUserHandle == "" {
+ currentUserHandle = message.Actor
+ }
+
+ activity.From = currentUserHandle
+
+ activity.Platform = w.Platform()
+
+ toUserHandle := currentUserHandle
+
+ // Check if the Note is a reply to another post - activity SocialComment object
+ if parentID, ok := noteObject[mastodon.InReplyTo].(string); ok {
+ activity.Type = typex.SocialComment
+ post.Target = &metadata.SocialPost{
+ PublicationID: parentID,
+ }
+
+ toUserHandle = convertURLToHandle(parentID)
+ }
+
+ activity.To = toUserHandle
+
+ // Store the current user's unique handle
+ err := w.saveMastodonHandle(ctx, currentUserHandle)
+ if err != nil {
+ zap.L().Info("failed to save mastodon handle", zap.Error(err))
+ return err
+ }
+
+ // Generate main action
+ mainAction := w.createAction(activity.Type, currentUserHandle, toUserHandle, post)
+ activity.Actions = append(activity.Actions, mainAction)
+
+ // Generate additional actions for mentions
+ mentionActions := w.createMentionActions(currentUserHandle, note, post)
+ activity.Actions = append(activity.Actions, mentionActions...)
+
+ return nil
+}
+
+// saveMastodonHandle store the unique handles into the relevant DB table
+func (w *worker) saveMastodonHandle(ctx context.Context, handleString string) error {
+ handle := &model.MastodonHandle{
+ Handle: handleString,
+ LastUpdated: time.Now(),
+ }
+
+ if err := w.databaseClient.SaveDatasetMastodonHandle(ctx, handle); err != nil {
+ return fmt.Errorf("failed to save Mastodon handle: %w", err)
+ }
+
+ // Add handle update to Redis set
+ if err := redis.AddHandleUpdate(ctx, w.redisClient, handleString); err != nil {
+ return fmt.Errorf("failed to add handle update to Redis: %w", err)
+ }
+
+ return nil
+}
+
+// mapToStruct converts a map to a struct using JSON marshal and unmarshal
+func mapToStruct(m map[string]interface{}, v interface{}) error {
+ // Marshal the map to JSON
+ jsonData, err := json.Marshal(m)
+ if err != nil {
+ return fmt.Errorf("failed to marshal map: %w", err)
+ }
+
+ // Unmarshal the JSON data into the struct
+ if err := json.Unmarshal(jsonData, v); err != nil {
+ return fmt.Errorf("failed to unmarshal into struct: %w", err)
+ }
+
+ return nil
+}
+
+// handleActivityPubAnnounce handles Announce activities (shares/boosts) in ActivityPub.
+func (w *worker) handleActivityPubAnnounce(ctx context.Context, message activitypub.Object, activity *activityx.Activity) error {
+ activity.Type = typex.SocialShare
+
+ currentUserHandle := convertURLToHandle(message.Actor)
+
+ // If the actor is from a relay server then we directly set it as the handle
+ if currentUserHandle == "" {
+ currentUserHandle = message.Actor
+ }
+
+ activity.From = currentUserHandle
+ activity.To = currentUserHandle
+
+ // Extract object IDs from the message
+ objectIDs, err := extractObjectIDs(message.Object)
+ if err != nil {
+ zap.L().Info("unsupported object type for Announce", zap.String("type", fmt.Sprintf("%T", message.Object)))
+ return err
+ }
+
+ // Store the current user's unique handle
+ err = w.saveMastodonHandle(ctx, currentUserHandle)
+ if err != nil {
+ zap.L().Info("failed to save mastodon handle", zap.Error(err))
+ return err
+ }
+
+ // Iteratively create action for every announcement of the activity
+ for _, announcedID := range objectIDs {
+ toUserHandle := convertURLToHandle(announcedID)
+
+ // Create a SocialPost object with the Announced ID
+ post := &metadata.SocialPost{
+ Handle: toUserHandle,
+ ProfileID: message.Actor,
+ PublicationID: message.ID,
+ Timestamp: w.parseTimestamp(message.Published),
+ Target: &metadata.SocialPost{
+ PublicationID: announcedID,
+ },
+ }
+
+ // Create and add action to activity
+ action := w.createAction(activity.Type, currentUserHandle, toUserHandle, post)
+ activity.Actions = append(activity.Actions, action)
+ }
+
+ return nil
+}
+
+// handleActivityPubLike handles Like activities in ActivityPub.
+func (w *worker) handleActivityPubLike(ctx context.Context, message activitypub.Object, activity *activityx.Activity) error {
+ activity.Type = typex.SocialComment
+
+ currentUserHandle := convertURLToHandle(message.Actor)
+
+ // If the actor is from a relay server then we directly set it as the handle
+ if currentUserHandle == "" {
+ currentUserHandle = message.Actor
+ }
+
+ activity.From = currentUserHandle
+
+ // Extract object IDs from the message
+ objectIDs, err := extractObjectIDs(message.Object)
+ if err != nil {
+ zap.L().Info("unsupported object type for Like", zap.String("type", fmt.Sprintf("%T", message.Object)))
+ return err
+ }
+
+ // Store the current user's unique handle
+ err = w.saveMastodonHandle(ctx, currentUserHandle)
+ if err != nil {
+ zap.L().Info("failed to save mastodon handle", zap.Error(err))
+ return err
+ }
+
+ for _, likedID := range objectIDs {
+ // Create a SocialPost object with the Liked ID
+ post := &metadata.SocialPost{
+ ProfileID: message.Actor,
+ PublicationID: message.ID,
+ Timestamp: w.parseTimestamp(message.Published),
+ Target: &metadata.SocialPost{
+ PublicationID: likedID,
+ },
+ }
+
+ // Create and add action to activity
+ action := w.createAction(activity.Type, message.Actor, "", post)
+ activity.Actions = append(activity.Actions, action)
+ }
+
+ return nil
+}
+
+// createMentionActions generates actions for mentions within a note.
+func (w *worker) createMentionActions(from string, note activitypub.Note, post *metadata.SocialPost) []*activityx.Action {
+ var actions []*activityx.Action
+
+ actionType := typex.SocialShare
+ // Make mention actions for every tag in the activity
+ for _, mention := range note.Tag {
+ if mention.Type == mastodon.TagTypeMention {
+ mentionUserHandle := convertURLToHandle(mention.Href)
+ mentionAction := w.createAction(actionType, from, mentionUserHandle, post)
+ actions = append(actions, mentionAction)
+ }
+ }
+
+ return actions
+}
+
+// createAction creates an activity action.
+func (w *worker) createAction(actionType schema.Type, from, to string, metadata metadata.Metadata) *activityx.Action {
+ return &activityx.Action{
+ Type: actionType,
+ Platform: w.Platform(),
+ From: from,
+ To: to,
+ Metadata: metadata,
+ }
+}
+
+// buildPost constructs a SocialPost object from ActivityPub object and note
+func (w *worker) buildPost(ctx context.Context, obj activitypub.Object, note activitypub.Note, timestamp uint64) *metadata.SocialPost {
+ // Create a new SocialPost with the content, profile ID, publication ID, and timestamp
+ post := &metadata.SocialPost{
+ Body: note.Content,
+ ProfileID: obj.Actor,
+ PublicationID: note.ID,
+ Timestamp: timestamp,
+ Handle: convertURLToHandle(obj.Actor),
+ }
+ // Attach media to the post
+ w.buildPostMedia(ctx, post, obj.Attachment)
+ w.buildPostTags(post, note.Tag)
+
+ return post
+}
+
+// buildPostMedia attaches media information to the post
+func (w *worker) buildPostMedia(_ context.Context, post *metadata.SocialPost, attachments []activitypub.Attachment) {
+ // Iterate over the attachments and add each media in attachment object to the post
+ for _, attachment := range attachments {
+ post.Media = append(post.Media, metadata.Media{
+ Address: attachment.URL,
+ MimeType: attachment.Type,
+ })
+ }
+}
+
+// buildPostTags builds the Tags field in the metatdata
+func (w *worker) buildPostTags(post *metadata.SocialPost, tags []activitypub.Tag) {
+ for _, tag := range tags {
+ if tag.Type == mastodon.TagTypeHashtag {
+ post.Tags = append(post.Tags, tag.Name)
+ }
+ }
+}
+
+// Iterate over the attachments and add each media to the post
+func (w *worker) parseTimestamp(timestamp string) uint64 {
+ t, err := time.Parse(time.RFC3339, timestamp)
+ if err != nil {
+ return 0
+ }
+
+ return uint64(t.Unix())
+}
+
+// extractObjectIDs used to extract Object IDs for Announce and Like ActivityPub object
+func extractObjectIDs(object interface{}) ([]string, error) {
+ var ids []string
+
+ switch obj := object.(type) {
+ case string:
+ ids = append(ids, obj)
+ case map[string]interface{}:
+ var nestedObject activitypub.Object
+ if err := mapToStruct(obj, &nestedObject); err != nil {
+ return nil, fmt.Errorf("failed to convert nested object: %w", err)
+ }
+
+ ids = append(ids, nestedObject.ID)
+ case []interface{}:
+ for _, item := range obj {
+ switch item := item.(type) {
+ case string:
+ ids = append(ids, item)
+ case map[string]interface{}:
+ var nestedObject activitypub.Object
+ if err := mapToStruct(item, &nestedObject); err != nil {
+ return nil, fmt.Errorf("failed to convert nested object: %w", err)
+ }
+
+ ids = append(ids, nestedObject.ID)
+ default:
+ return nil, fmt.Errorf("unsupported object type in array: %T", item)
+ }
+ }
+ default:
+ return nil, fmt.Errorf("unsupported object type: %T", obj)
+ }
+
+ return ids, nil
+}
+
+// convertURLToHandle converts a Mastodon URL into a user handle in the format '@username@domain'.
+func convertURLToHandle(statusID string) string {
+ parsedURL, err := url.Parse(statusID)
+ if err != nil {
+ return ""
+ }
+
+ // Handle ActivityPub actor URLs, in particular the URLs related to Relay servers
+ if strings.HasSuffix(parsedURL.Path, "/actor") {
+ username := "relay"
+ domain := parsedURL.Host
+
+ return fmt.Sprintf("@%s@%s", username, domain)
+ }
+
+ // Check if the path contains "@username" (e.g., "/@username/status")
+ if strings.HasPrefix(parsedURL.Path, "/@") {
+ parts := strings.Split(parsedURL.Path, "/")
+ if len(parts) < 2 {
+ return ""
+ }
+
+ username := strings.TrimPrefix(parts[1], "@")
+ domain := parsedURL.Host
+
+ return fmt.Sprintf("@%s@%s", username, domain)
+ }
+
+ // Fallback for other URL formats (like status URLs)
+ parts := strings.Split(parsedURL.Path, "/")
+ if len(parts) < 3 {
+ return ""
+ }
+
+ username := parts[2]
+
+ domain := parsedURL.Host
+
+ return fmt.Sprintf("@%s@%s", username, domain)
+}
+
+// NewWorker creates a new Mastodon worker instance
+func NewWorker(databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
+ httpClient, err := httpx.NewHTTPClient()
+
+ worker := worker{
+ httpClient: httpClient,
+ databaseClient: databaseClient,
+ redisClient: redisClient,
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("new http client: %w", err)
+ }
+
+ return &worker, nil
+}
diff --git a/internal/engine/worker/federated/activitypub/mastodon/worker_test.go b/internal/engine/worker/federated/activitypub/mastodon/worker_test.go
new file mode 100644
index 00000000..4a697a21
--- /dev/null
+++ b/internal/engine/worker/federated/activitypub/mastodon/worker_test.go
@@ -0,0 +1,480 @@
+package mastodon
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/json"
+ "fmt"
+ "math/big"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/adrianbrad/psqldocker"
+ "github.com/orlangure/gnomock"
+ "github.com/orlangure/gnomock/preset/redis"
+ "github.com/redis/rueidis"
+ "github.com/rss3-network/node/config"
+ "github.com/rss3-network/node/internal/database"
+ "github.com/rss3-network/node/internal/database/dialer"
+ "github.com/rss3-network/node/internal/engine/source/activitypub"
+ message "github.com/rss3-network/node/provider/activitypub"
+ redisx "github.com/rss3-network/node/provider/redis"
+ "github.com/rss3-network/node/schema/worker/decentralized"
+ "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/metadata"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/typex"
+ "github.com/samber/lo"
+ "github.com/stretchr/testify/require"
+)
+
+var (
+ setupOnce sync.Once
+ redisClient rueidis.Client
+ databaseClient database.Client
+)
+
+func setup(t *testing.T) {
+ setupOnce.Do(func() {
+ var err error
+
+ // Setup database client
+ databaseClient = setupDatabaseClient(t)
+
+ // Start Redis container with TLS
+ preset := redis.Preset(
+ redis.WithVersion("6.0.9"),
+ )
+
+ var container *gnomock.Container
+ for attempts := 0; attempts < 3; attempts++ {
+ container, err = gnomock.Start(preset)
+ if err == nil {
+ break
+ }
+ // If starting fails, wait a bit and try again
+ time.Sleep(time.Second * 2)
+ }
+ require.NoError(t, err, "Failed to start Redis container after multiple attempts")
+
+ t.Cleanup(func() {
+ require.NoError(t, gnomock.Stop(container))
+ })
+
+ // Connect to Redis without TLS
+ redisClient, err = redisx.NewClient(config.Redis{
+ Endpoint: container.DefaultAddress(),
+ TLS: config.RedisTLS{
+ Enabled: false,
+ CAFile: "/path/to/ca.crt",
+ CertFile: "/path/to/client.crt",
+ KeyFile: "/path/to/client.key",
+ InsecureSkipVerify: false,
+ },
+ })
+ require.NoError(t, err)
+ })
+}
+
+func setupDatabaseClient(t *testing.T) database.Client {
+ container, dataSourceName, err := createContainer(context.Background(), database.DriverPostgreSQL, true)
+ require.NoError(t, err)
+
+ t.Cleanup(func() {
+ require.NoError(t, container.Close())
+ })
+
+ // Dial the database
+ client, err := dialer.Dial(context.Background(), &config.Database{
+ Driver: database.DriverPostgreSQL,
+ URI: dataSourceName,
+ Partition: lo.ToPtr(true),
+ })
+ require.NoError(t, err)
+ require.NotNil(t, client)
+
+ // Migrate the database
+ require.NoError(t, client.Migrate(context.Background()))
+
+ return client
+}
+
+func createContainer(_ context.Context, driver database.Driver, _ bool) (container *psqldocker.Container, dataSourceName string, err error) {
+ switch driver {
+ case database.DriverPostgreSQL:
+ nBig, err := rand.Int(rand.Reader, big.NewInt(100000))
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to generate secure random number: %w", err)
+ }
+
+ containerName := fmt.Sprintf("psql-test-container-%d", nBig.Int64())
+
+ c, err := psqldocker.NewContainer(
+ "user",
+ "password",
+ "test",
+ psqldocker.WithContainerName(containerName),
+ )
+ if err != nil {
+ return nil, "", fmt.Errorf("create psql container: %w", err)
+ }
+
+ return c, formatContainerURI(c), nil
+ default:
+ return nil, "", fmt.Errorf("unsupported driver: %s", driver)
+ }
+}
+
+func formatContainerURI(container *psqldocker.Container) string {
+ return fmt.Sprintf(
+ "postgres://user:password@%s:%s/%s?sslmode=disable",
+ "127.0.0.1",
+ container.Port(),
+ "test",
+ )
+}
+
+func TestWorker(t *testing.T) {
+ t.Parallel()
+ setup(t)
+
+ type arguments struct {
+ task *activitypub.Task
+ }
+
+ testcases := []struct {
+ name string
+ arguments arguments
+ want *activity.Activity
+ wantError require.ErrorAssertionFunc
+ }{
+ {
+ name: "Create A Note",
+ arguments: arguments{
+ task: &activitypub.Task{
+ Network: network.Mastodon,
+ Message: message.Object{
+ Context: []interface{}{
+ "https://www.w3.org/ns/activitystreams",
+ },
+ ID: "https://airwaves.social/users/VicRB/statuses/112836523057177095",
+ Type: "Create",
+ Actor: "https://airwaves.social/users/VicRB",
+ Published: "2024-07-23T15:31:43Z",
+ Object: map[string]interface{}{
+ "type": "Note",
+ "id": "https://airwaves.social/users/VicRB/statuses/112836523057177095",
+ "content": "
#VesselAlert #Vaixell ...
",
+ "published": "2024-07-23T15:31:43Z",
+ "to": []string{
+ "https://www.w3.org/ns/activitystreams#Public",
+ },
+ },
+ },
+ },
+ },
+ want: &activity.Activity{
+ ID: "https://airwaves.social/users/VicRB/statuses/112836523057177095",
+ Network: network.Mastodon,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@VicRB@airwaves.social",
+ To: "@VicRB@airwaves.social",
+ Type: typex.SocialPost,
+ Status: true,
+ Actions: []*activity.Action{
+ {
+ Type: typex.SocialPost,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@VicRB@airwaves.social",
+ To: "@VicRB@airwaves.social",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://airwaves.social/users/VicRB/statuses/112836523057177095",
+ Body: "#VesselAlert #Vaixell ...
",
+ Handle: "@VicRB@airwaves.social",
+ ProfileID: "https://airwaves.social/users/VicRB",
+ Tags: nil,
+ Timestamp: 1721748703,
+ },
+ },
+ },
+ Timestamp: 1721748703,
+ },
+ wantError: require.NoError,
+ },
+ {
+ name: "Create A Comment",
+ arguments: arguments{
+ task: &activitypub.Task{
+ Network: network.Mastodon,
+ Message: message.Object{
+ Context: []interface{}{
+ "https://www.w3.org/ns/activitystreams",
+ },
+ ID: "https://beekeeping.ninja/users/Pagan_Animist/statuses/112840117527501203",
+ Type: "Create",
+ Published: "2024-07-24T06:45:51Z",
+ Actor: "https://beekeeping.ninja/users/Pagan_Animist",
+ Object: map[string]interface{}{
+ "type": "Note",
+ "id": "https://beekeeping.ninja/users/Pagan_Animist/statuses/112840117527501203",
+ "content": "@evedazzle
Can communities band together ...
Would they help?
I’m just thinking of your power grid and next time.
",
+ "inReplyTo": "https://mas.to/users/evedazzle/statuses/112802025232873362",
+ "published": "2024-07-24T06:45:51Z",
+ "to": []string{
+ "https://www.w3.org/ns/activitystreams#Public",
+ },
+ },
+ },
+ },
+ },
+ want: &activity.Activity{
+ ID: "https://beekeeping.ninja/users/Pagan_Animist/statuses/112840117527501203",
+ Network: network.Mastodon,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Pagan_Animist@beekeeping.ninja",
+ To: "@evedazzle@mas.to",
+ Type: typex.SocialComment,
+ Status: true,
+ Actions: []*activity.Action{
+ {
+ Type: typex.SocialComment,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Pagan_Animist@beekeeping.ninja",
+ To: "@evedazzle@mas.to",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://beekeeping.ninja/users/Pagan_Animist/statuses/112840117527501203",
+ Body: "@evedazzle
Can communities band together ...
Would they help?
I’m just thinking of your power grid and next time.
",
+ Handle: "@Pagan_Animist@beekeeping.ninja",
+ ProfileID: "https://beekeeping.ninja/users/Pagan_Animist",
+ Target: &metadata.SocialPost{
+ PublicationID: "https://mas.to/users/evedazzle/statuses/112802025232873362",
+ },
+ Timestamp: 1721803551,
+ },
+ },
+ },
+ Timestamp: 1721803551,
+ },
+ wantError: require.NoError,
+ },
+ {
+ name: "Create A Comment (Includes 2 Mentions)",
+ arguments: arguments{
+ task: &activitypub.Task{
+ Network: network.Mastodon,
+ Message: message.Object{
+ Context: []interface{}{
+ "https://www.w3.org/ns/activitystreams",
+ },
+ ID: "https://epicure.social/users/Island_Martha/statuses/112840097961400438",
+ Type: "Create",
+ Actor: "https://epicure.social/users/Island_Martha",
+ Published: "2024-07-24T06:40:52Z",
+ Object: map[string]interface{}{
+ "type": "Note",
+ "id": "https://epicure.social/users/Island_Martha/statuses/112840097961400438",
+ "content": "@SpookieRobieTheCat @eunews
Or at very least restrict or remove their voting rights. Orban is not MAGA Mike
",
+ "inReplyTo": "https://mastodon.social/users/SpookieRobieTheCat/statuses/112840076342641439",
+ "to": []string{
+ "https://www.w3.org/ns/activitystreams#Public",
+ },
+ "tag": []map[string]string{
+ {
+ "type": "Mention",
+ "href": "https://mastodon.social/@SpookieRobieTheCat",
+ "name": "@SpookieRobieTheCat",
+ },
+ {
+ "type": "Mention",
+ "href": "https://mastodon.social/@eunews",
+ "name": "@eunews",
+ },
+ },
+ },
+ },
+ },
+ },
+ want: &activity.Activity{
+ ID: "https://epicure.social/users/Island_Martha/statuses/112840097961400438",
+ Network: network.Mastodon,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Island_Martha@epicure.social",
+ To: "@SpookieRobieTheCat@mastodon.social",
+ Type: typex.SocialComment,
+ Status: true,
+ Actions: []*activity.Action{
+ {
+ Type: typex.SocialComment,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Island_Martha@epicure.social",
+ To: "@SpookieRobieTheCat@mastodon.social",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://epicure.social/users/Island_Martha/statuses/112840097961400438",
+ Body: "@SpookieRobieTheCat @eunews
Or at very least restrict or remove their voting rights. Orban is not MAGA Mike
",
+ Handle: "@Island_Martha@epicure.social",
+ ProfileID: "https://epicure.social/users/Island_Martha",
+ Target: &metadata.SocialPost{
+ PublicationID: "https://mastodon.social/users/SpookieRobieTheCat/statuses/112840076342641439",
+ },
+ Timestamp: 1721803252,
+ },
+ },
+ {
+ Type: typex.SocialShare,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Island_Martha@epicure.social",
+ To: "@SpookieRobieTheCat@mastodon.social",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://epicure.social/users/Island_Martha/statuses/112840097961400438",
+ Body: "@SpookieRobieTheCat @eunews
Or at very least restrict or remove their voting rights. Orban is not MAGA Mike
",
+ Handle: "@Island_Martha@epicure.social",
+ ProfileID: "https://epicure.social/users/Island_Martha",
+ Target: &metadata.SocialPost{
+ PublicationID: "https://mastodon.social/users/SpookieRobieTheCat/statuses/112840076342641439",
+ },
+ Timestamp: 1721803252,
+ },
+ },
+ {
+ Type: typex.SocialShare,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Island_Martha@epicure.social",
+ To: "@eunews@mastodon.social",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://epicure.social/users/Island_Martha/statuses/112840097961400438",
+ Body: "@SpookieRobieTheCat @eunews
Or at very least restrict or remove their voting rights. Orban is not MAGA Mike
",
+ Handle: "@Island_Martha@epicure.social",
+ ProfileID: "https://epicure.social/users/Island_Martha",
+ Target: &metadata.SocialPost{
+ PublicationID: "https://mastodon.social/users/SpookieRobieTheCat/statuses/112840076342641439",
+ },
+ Timestamp: 1721803252,
+ },
+ },
+ },
+ Timestamp: 1721803252,
+ },
+ wantError: require.NoError,
+ },
+ {
+ name: "Announce(Share)",
+ arguments: arguments{
+ task: &activitypub.Task{
+ Network: network.Mastodon,
+ Message: message.Object{
+ Context: []interface{}{
+ "https://www.w3.org/ns/activitystreams",
+ },
+ ID: "https://relay.an.exchange/activities/d93bf6f6-832d-49d0-b841-3654d8da0b79",
+ Type: "Announce",
+ Actor: "https://relay.an.exchange/actor",
+ Published: "2024-07-22T00:00:00Z",
+ Object: map[string]interface{}{
+ "type": "Note",
+ "id": "https://cr8r.gg/users/SarraceniaWilds#announces/112250337669855051/undo",
+ },
+ },
+ },
+ },
+ want: &activity.Activity{
+ ID: "https://relay.an.exchange/activities/d93bf6f6-832d-49d0-b841-3654d8da0b79",
+ Network: network.Mastodon,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@relay@relay.an.exchange",
+ To: "@relay@relay.an.exchange",
+ Type: typex.SocialShare,
+ Status: true,
+ Actions: []*activity.Action{
+ {
+ Type: typex.SocialShare,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@relay@relay.an.exchange",
+ To: "@SarraceniaWilds@cr8r.gg",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://relay.an.exchange/activities/d93bf6f6-832d-49d0-b841-3654d8da0b79",
+ ProfileID: "https://relay.an.exchange/actor",
+ Handle: "@SarraceniaWilds@cr8r.gg",
+ Target: &metadata.SocialPost{
+ PublicationID: "https://cr8r.gg/users/SarraceniaWilds#announces/112250337669855051/undo",
+ },
+ Timestamp: 1721606400,
+ },
+ },
+ },
+ Timestamp: 1721606400,
+ },
+ wantError: require.NoError,
+ },
+ {
+ name: "Like",
+ arguments: arguments{
+ task: &activitypub.Task{
+ Network: network.Mastodon,
+ Message: message.Object{
+ Context: []interface{}{
+ "https://www.w3.org/ns/activitystreams",
+ },
+ ID: "https://mock.social/activities/like123",
+ Type: "Like",
+ Actor: "https://beekeeping.ninja/users/Pagan_Animist",
+ Published: "2024-07-22T00:00:00Z",
+ Object: map[string]interface{}{
+ "type": "Note",
+ "id": "https://mock.social/notes/note123",
+ },
+ },
+ },
+ },
+ want: &activity.Activity{
+ ID: "https://mock.social/activities/like123",
+ Network: network.Mastodon,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "@Pagan_Animist@beekeeping.ninja",
+ To: "",
+ Type: typex.SocialComment,
+ Status: true,
+ Actions: []*activity.Action{
+ {
+ Type: typex.SocialComment,
+ Platform: decentralized.PlatformMastodon.String(),
+ From: "https://beekeeping.ninja/users/Pagan_Animist",
+ To: "",
+ Metadata: &metadata.SocialPost{
+ PublicationID: "https://mock.social/activities/like123",
+ ProfileID: "https://beekeeping.ninja/users/Pagan_Animist",
+ Target: &metadata.SocialPost{
+ PublicationID: "https://mock.social/notes/note123",
+ },
+ Timestamp: 1721606400,
+ },
+ },
+ },
+ Timestamp: 1721606400,
+ },
+ wantError: require.NoError,
+ },
+ }
+ for _, testcase := range testcases {
+ testcase := testcase
+
+ t.Run(testcase.name, func(t *testing.T) {
+ t.Parallel()
+
+ ctx := context.Background()
+
+ instance, err := NewWorker(databaseClient, redisClient)
+ require.NoError(t, err)
+
+ activity, err := instance.Transform(ctx, testcase.arguments.task)
+ testcase.wantError(t, err)
+
+ data, err := json.MarshalIndent(activity, "", "\x20\x20")
+ require.NoError(t, err)
+
+ t.Log(string(data))
+
+ require.Equal(t, testcase.want, activity)
+ })
+ }
+}
diff --git a/internal/engine/worker/federated/factory.go b/internal/engine/worker/federated/factory.go
new file mode 100644
index 00000000..bdac632a
--- /dev/null
+++ b/internal/engine/worker/federated/factory.go
@@ -0,0 +1,21 @@
+package worker
+
+import (
+ "fmt"
+
+ "github.com/redis/rueidis"
+ "github.com/rss3-network/node/config"
+ "github.com/rss3-network/node/internal/database"
+ "github.com/rss3-network/node/internal/engine"
+ "github.com/rss3-network/node/internal/engine/worker/federated/activitypub/mastodon"
+ "github.com/rss3-network/node/schema/worker/decentralized"
+)
+
+func New(config *config.Module, databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
+ switch config.Worker {
+ case decentralized.Mastodon:
+ return mastodon.NewWorker(databaseClient, redisClient)
+ default:
+ return nil, fmt.Errorf("[federated/factory.go] unsupported worker %s", config.Worker)
+ }
+}
diff --git a/internal/node/component/federated/component.go b/internal/node/component/federated/component.go
new file mode 100644
index 00000000..f6093d7c
--- /dev/null
+++ b/internal/node/component/federated/component.go
@@ -0,0 +1,139 @@
+package federated
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ cb "github.com/emirpasic/gods/queues/circularbuffer"
+ "github.com/labstack/echo/v4"
+ "github.com/redis/rueidis"
+ "github.com/rss3-network/node/config"
+ "github.com/rss3-network/node/internal/constant"
+ "github.com/rss3-network/node/internal/database"
+ "github.com/rss3-network/node/internal/node/component"
+ "github.com/rss3-network/node/internal/node/component/middleware"
+ "github.com/rss3-network/node/provider/ethereum/etherface"
+ "github.com/samber/lo"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/trace"
+ "go.uber.org/zap"
+)
+
+type Component struct {
+ config *config.File
+ counter metric.Int64Counter
+ databaseClient database.Client
+ etherfaceClient etherface.Client
+ redisClient rueidis.Client
+}
+
+const Name = "federated"
+const MaxRecentRequests = 10
+
+var (
+ RecentRequests *cb.Queue
+ recentRequestsMutex sync.RWMutex
+)
+
+func (c *Component) Name() string {
+ return Name
+}
+
+var _ component.Component = (*Component)(nil)
+
+func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File, databaseClient database.Client, redisClient rueidis.Client) component.Component {
+ RecentRequests = cb.New(MaxRecentRequests)
+
+ c := &Component{
+ config: config,
+ databaseClient: databaseClient,
+ redisClient: redisClient,
+ }
+
+ group := apiServer.Group(fmt.Sprintf("/%s", Name))
+
+ // Add middleware for bearer token authentication
+ group.Use(middleware.BearerAuth(config.Discovery.Server.AccessToken))
+
+ group.GET("/tx/:id", c.GetActivity)
+ group.GET("/:account", c.GetAccountActivities)
+ group.GET("/network/:network", c.GetNetworkActivities)
+ group.GET("/platform/:platform", c.GetPlatformActivities)
+ group.POST("/accounts", c.BatchGetAccountsActivities)
+ group.GET("/handles", c.GetAllActiveHandles)
+ group.GET("/handles/updated", c.GetUpdatedHandles)
+
+ if err := c.InitMeter(); err != nil {
+ panic(err)
+ }
+
+ // Initialize etherface client, an optional dependency
+ etherfaceClient, err := etherface.NewEtherfaceClient()
+ if err != nil {
+ zap.L().Warn("failed to initialize etherface client", zap.Any("error", err))
+ } else {
+ c.etherfaceClient = etherfaceClient
+ }
+
+ return c
+}
+
+func (c *Component) InitMeter() (err error) {
+ meter := otel.GetMeterProvider().Meter(constant.Name)
+
+ if c.counter, err = meter.Int64Counter(c.Name()); err != nil {
+ return fmt.Errorf("failed to init meter for component %s: %w", c.Name(), err)
+ }
+
+ return nil
+}
+
+func (c *Component) CollectMetric(ctx context.Context, path, value string) {
+ measurementOption := metric.WithAttributes(
+ attribute.String("component", c.Name()),
+ attribute.String("path", path),
+ attribute.String("value", value),
+ )
+
+ c.counter.Add(ctx, int64(1), measurementOption)
+}
+
+func (c *Component) CollectTrace(ctx context.Context, path, value string) {
+ spanStartOptions := []trace.SpanStartOption{
+ trace.WithSpanKind(trace.SpanKindServer),
+ trace.WithAttributes(
+ attribute.String("path", path),
+ attribute.String("value", value),
+ ),
+ }
+
+ _, span := otel.Tracer("").Start(ctx, "federated API Query", spanStartOptions...)
+ defer span.End()
+}
+
+func addRecentRequest(path string) {
+ recentRequestsMutex.Lock()
+ defer recentRequestsMutex.Unlock()
+
+ RecentRequests.Enqueue(path)
+}
+
+// GetRecentRequest returns the filtered recent requests.
+func GetRecentRequest() []string {
+ recentRequestsMutex.RLock()
+ defer recentRequestsMutex.RUnlock()
+
+ // Convert queue to slice
+ values := RecentRequests.Values()
+
+ // Filter out empty strings and convert to []string
+ filteredRequests := lo.FilterMap(values, func(item interface{}, _ int) (string, bool) {
+ str, ok := item.(string)
+ return str, ok && str != ""
+ })
+
+ return filteredRequests
+}
diff --git a/internal/node/component/federated/data.go b/internal/node/component/federated/data.go
new file mode 100644
index 00000000..64cdf2cd
--- /dev/null
+++ b/internal/node/component/federated/data.go
@@ -0,0 +1,71 @@
+package federated
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ "github.com/rss3-network/node/internal/database/model"
+ "github.com/rss3-network/node/provider/redis"
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ networkx "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/samber/lo"
+)
+
+func (c *Component) getActivity(ctx context.Context, request model.ActivityQuery) (*activityx.Activity, *int, error) {
+ return c.databaseClient.FindActivity(ctx, request)
+}
+
+func (c *Component) getActivities(ctx context.Context, request model.ActivitiesQuery) ([]*activityx.Activity, string, error) {
+ activities, err := c.databaseClient.FindActivities(ctx, request)
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to find activities: %w", err)
+ }
+
+ last, exist := lo.Last(activities)
+ if exist {
+ return activities, c.transformCursor(ctx, last), nil
+ }
+
+ return nil, "", nil
+}
+
+func (c *Component) getCursor(ctx context.Context, cursor *string) (*activityx.Activity, error) {
+ if cursor == nil {
+ return nil, nil
+ }
+
+ str := strings.Split(*cursor, ":")
+ if len(str) != 2 {
+ return nil, fmt.Errorf("invalid cursor")
+ }
+
+ network, err := networkx.NetworkString(str[1])
+ if err != nil {
+ return nil, fmt.Errorf("invalid cursor: %w", err)
+ }
+
+ data, _, err := c.getActivity(ctx, model.ActivityQuery{ID: lo.ToPtr(str[0]), Network: lo.ToPtr(network)})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get cursor: %w", err)
+ }
+
+ return data, nil
+}
+
+func (c *Component) transformCursor(_ context.Context, activity *activityx.Activity) string {
+ if activity == nil {
+ return ""
+ }
+
+ return fmt.Sprintf("%s:%s", activity.ID, activity.Network)
+}
+
+// redis data retrieval:
+func (c *Component) getAllHandles(ctx context.Context) ([]string, error) {
+ return redis.GetAllHandles(ctx, c.redisClient)
+}
+
+func (c *Component) getUpdatedHandles(ctx context.Context, since uint64) ([]string, error) {
+ return redis.GetRecentHandleUpdates(ctx, c.redisClient, since)
+}
diff --git a/internal/node/component/federated/handler_activity.go b/internal/node/component/federated/handler_activity.go
new file mode 100644
index 00000000..ce9c3947
--- /dev/null
+++ b/internal/node/component/federated/handler_activity.go
@@ -0,0 +1,308 @@
+package federated
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strconv"
+
+ "github.com/creasty/defaults"
+ "github.com/labstack/echo/v4"
+ "github.com/rss3-network/node/common/http/response"
+ "github.com/rss3-network/node/internal/database/model"
+ "github.com/rss3-network/node/schema/worker/decentralized"
+ "github.com/rss3-network/protocol-go/schema"
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/tag"
+ "github.com/rss3-network/protocol-go/schema/typex"
+ "github.com/samber/lo"
+ lop "github.com/samber/lo/parallel"
+ "go.uber.org/zap"
+)
+
+func (c *Component) GetActivity(ctx echo.Context) error {
+ var request ActivityRequest
+
+ if err := ctx.Bind(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err := defaults.Set(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err := ctx.Validate(&request); err != nil {
+ return response.ValidationFailedError(ctx, err)
+ }
+
+ go c.CollectTrace(ctx.Request().Context(), ctx.Request().RequestURI, request.ID)
+
+ go c.CollectMetric(ctx.Request().Context(), ctx.Request().RequestURI, request.ID)
+
+ addRecentRequest(ctx.Request().RequestURI)
+
+ query := model.ActivityQuery{
+ ID: lo.ToPtr(request.ID),
+ ActionLimit: request.ActionLimit,
+ ActionPage: request.ActionPage,
+ }
+
+ activity, page, err := c.getActivity(ctx.Request().Context(), query)
+ if err != nil {
+ zap.L().Error("GetActivity InternalError", zap.Error(err))
+ return response.InternalError(ctx)
+ }
+
+ // query etherface for the transaction
+ if c.etherfaceClient != nil && activity != nil && activity.Type == typex.Unknown && activity.Calldata != nil {
+ activity.Calldata.ParsedFunction, _ = c.etherfaceClient.Lookup(ctx.Request().Context(), activity.Calldata.FunctionHash)
+ }
+
+ // transform the activity such as adding related urls
+ result, err := c.TransformActivity(ctx.Request().Context(), activity)
+ if err != nil {
+ zap.L().Error("TransformActivity InternalError", zap.Error(err))
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, ActivityResponse{
+ Data: result,
+ Meta: lo.Ternary(page == nil, nil, &MetaTotalPages{
+ TotalPages: lo.FromPtr(page),
+ }),
+ })
+}
+
+func (c *Component) GetAccountActivities(ctx echo.Context) (err error) {
+ var request AccountActivitiesRequest
+
+ if err = ctx.Bind(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if request.Type, err = c.parseTypes(ctx.QueryParams()["type"], request.Tag); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err = defaults.Set(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err = ctx.Validate(&request); err != nil {
+ return response.ValidationFailedError(ctx, err)
+ }
+
+ go c.CollectTrace(ctx.Request().Context(), ctx.Request().RequestURI, request.Account)
+
+ go c.CollectMetric(ctx.Request().Context(), ctx.Request().RequestURI, request.Account)
+
+ addRecentRequest(ctx.Request().RequestURI)
+
+ cursor, err := c.getCursor(ctx.Request().Context(), request.Cursor)
+ if err != nil {
+ zap.L().Error("getCursor InternalError", zap.Error(err))
+ return response.InternalError(ctx)
+ }
+
+ databaseRequest := model.ActivitiesQuery{
+ Cursor: cursor,
+ StartTimestamp: request.SinceTimestamp,
+ EndTimestamp: request.UntilTimestamp,
+ Owner: lo.ToPtr(request.Account),
+ Limit: request.Limit,
+ ActionLimit: request.ActionLimit,
+ Status: request.Status,
+ Direction: request.Direction,
+ Network: lo.Uniq(request.Network),
+ Tags: lo.Uniq(request.Tag),
+ Types: lo.Uniq(request.Type),
+ Platforms: lo.Uniq(request.Platform),
+ }
+
+ activities, last, err := c.getActivities(ctx.Request().Context(), databaseRequest)
+ if err != nil {
+ zap.L().Error("getActivities InternalError", zap.Error(err))
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, ActivitiesResponse{
+ Data: c.TransformActivities(ctx.Request().Context(), activities),
+ Meta: lo.Ternary(len(activities) < databaseRequest.Limit, nil, &MetaCursor{
+ Cursor: last,
+ }),
+ })
+}
+
+// BatchGetAccountsActivities returns the activities of multiple accounts in a single request
+func (c *Component) BatchGetAccountsActivities(ctx echo.Context) (err error) {
+ var request AccountsActivitiesRequest
+
+ if err = ctx.Bind(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ types, err := c.parseTypes(request.Type, request.Tag)
+ if err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err = defaults.Set(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err = ctx.Validate(&request); err != nil {
+ return response.ValidationFailedError(ctx, err)
+ }
+
+ go c.CollectTrace(ctx.Request().Context(), ctx.Request().RequestURI, strconv.Itoa(len(request.Accounts)))
+
+ go c.CollectMetric(ctx.Request().Context(), ctx.Request().RequestURI, strconv.Itoa(len(request.Accounts)))
+
+ addRecentRequest(ctx.Request().RequestURI)
+
+ cursor, err := c.getCursor(ctx.Request().Context(), request.Cursor)
+ if err != nil {
+ zap.L().Error("getCursor InternalError", zap.Error(err))
+ return response.InternalError(ctx)
+ }
+
+ databaseRequest := model.ActivitiesQuery{
+ Cursor: cursor,
+ StartTimestamp: request.SinceTimestamp,
+ EndTimestamp: request.UntilTimestamp,
+ Owners: lo.Uniq(lo.Map(request.Accounts, func(account string, _ int) string {
+ return account
+ })),
+ Limit: request.Limit,
+ ActionLimit: request.ActionLimit,
+ Status: request.Status,
+ Direction: request.Direction,
+ Network: lo.Uniq(request.Network),
+ Tags: lo.Uniq(request.Tag),
+ Types: lo.Uniq(types),
+ Platforms: lo.Uniq(request.Platform),
+ }
+
+ activities, last, err := c.getActivities(ctx.Request().Context(), databaseRequest)
+ if err != nil {
+ zap.L().Error("getActivities InternalError", zap.Error(err))
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, ActivitiesResponse{
+ Data: c.TransformActivities(ctx.Request().Context(), activities),
+ Meta: lo.Ternary(len(activities) < databaseRequest.Limit, nil, &MetaCursor{
+ Cursor: last,
+ }),
+ })
+}
+
+func (c *Component) TransformActivities(ctx context.Context, activities []*activityx.Activity) []*activityx.Activity {
+ results := make([]*activityx.Activity, len(activities))
+
+ // iterate over the activities
+ // 1. transform the activity such as adding related urls and filling the author url
+ // 2. query etherface for the transaction to get parsed function name
+ lop.ForEach(activities, func(_ *activityx.Activity, index int) {
+ result, err := c.TransformActivity(ctx, activities[index])
+ if err != nil {
+ zap.L().Error("failed to load activity", zap.Error(err))
+
+ return
+ }
+
+ // query etherface to get the parsed function name
+ if c.etherfaceClient != nil && result.Type == typex.Unknown && result.Calldata != nil {
+ result.Calldata.ParsedFunction, _ = c.etherfaceClient.Lookup(ctx, result.Calldata.FunctionHash)
+ }
+
+ results[index] = result
+ })
+
+ return results
+}
+
+func (c *Component) parseTypes(types []string, tags []tag.Tag) ([]schema.Type, error) {
+ if len(tags) == 0 {
+ return nil, nil
+ }
+
+ schemaTypes := make([]schema.Type, 0)
+
+ for _, typex := range types {
+ var (
+ value schema.Type
+ err error
+ )
+
+ for _, tagx := range tags {
+ value, err = schema.ParseTypeFromString(tagx, typex)
+ if err == nil {
+ schemaTypes = append(schemaTypes, value)
+
+ break
+ }
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("invalid type: %s", typex)
+ }
+ }
+
+ return schemaTypes, nil
+}
+
+type ActivityRequest struct {
+ ID string `param:"id"`
+ ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"`
+ ActionPage int `query:"action_page" validate:"min=1" default:"1"`
+}
+
+type AccountActivitiesRequest struct {
+ Account string `param:"account" validate:"required"`
+ Limit int `query:"limit" validate:"min=1,max=100" default:"100"`
+ ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"`
+ Cursor *string `query:"cursor"`
+ SinceTimestamp *uint64 `query:"since_timestamp"`
+ UntilTimestamp *uint64 `query:"until_timestamp"`
+ Status *bool `query:"success"`
+ Direction *activityx.Direction `query:"direction"`
+ Network []network.Network `query:"network"`
+ Tag []tag.Tag `query:"tag"`
+ Type []schema.Type `query:"-"`
+ Platform []decentralized.Platform `query:"platform"`
+}
+
+type AccountsActivitiesRequest struct {
+ Accounts []string `json:"accounts" validate:"required"`
+ Limit int `json:"limit" validate:"min=1,max=100" default:"100"`
+ ActionLimit int `json:"action_limit" validate:"min=1,max=20" default:"10"`
+ Cursor *string `json:"cursor"`
+ SinceTimestamp *uint64 `json:"since_timestamp"`
+ UntilTimestamp *uint64 `json:"until_timestamp"`
+ Status *bool `json:"success"`
+ Direction *activityx.Direction `json:"direction"`
+ Network []network.Network `json:"network"`
+ Tag []tag.Tag `json:"tag"`
+ Type []string `json:"type"`
+ Platform []decentralized.Platform `json:"platform"`
+}
+
+type ActivityResponse struct {
+ Data *activityx.Activity `json:"data"`
+ Meta *MetaTotalPages `json:"meta"`
+}
+
+type ActivitiesResponse struct {
+ Data []*activityx.Activity `json:"data"`
+ Meta *MetaCursor `json:"meta,omitempty"`
+}
+
+type MetaTotalPages struct {
+ TotalPages int `json:"totalPages"`
+}
+
+type MetaCursor struct {
+ Cursor string `json:"cursor"`
+}
diff --git a/internal/node/component/federated/handler_handles.go b/internal/node/component/federated/handler_handles.go
new file mode 100644
index 00000000..06e58106
--- /dev/null
+++ b/internal/node/component/federated/handler_handles.go
@@ -0,0 +1,36 @@
+package federated
+
+import (
+ "net/http"
+ "strconv"
+
+ "github.com/labstack/echo/v4"
+ "github.com/rss3-network/node/common/http/response"
+)
+
+// GetAllActiveHandles retrieves all active handles
+func (c *Component) GetAllActiveHandles(ctx echo.Context) error {
+ handles, err := c.getAllHandles(ctx.Request().Context())
+ if err != nil {
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, map[string]interface{}{"handles": handles})
+}
+
+// GetUpdatedHandles retrieves handles updated since a given timestamp
+func (c *Component) GetUpdatedHandles(ctx echo.Context) error {
+ sinceStr := ctx.QueryParam("since")
+ since, err := strconv.ParseUint(sinceStr, 10, 64)
+
+ if err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ handles, err := c.getUpdatedHandles(ctx.Request().Context(), since)
+ if err != nil {
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, map[string]interface{}{"handles": handles})
+}
diff --git a/internal/node/component/federated/handler_network_activity.go b/internal/node/component/federated/handler_network_activity.go
new file mode 100644
index 00000000..c13640ca
--- /dev/null
+++ b/internal/node/component/federated/handler_network_activity.go
@@ -0,0 +1,93 @@
+package federated
+
+import (
+ "net/http"
+
+ "github.com/creasty/defaults"
+ "github.com/labstack/echo/v4"
+ "github.com/rss3-network/node/common/http/response"
+ "github.com/rss3-network/node/internal/database/model"
+ "github.com/rss3-network/node/schema/worker/decentralized"
+ "github.com/rss3-network/protocol-go/schema"
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/tag"
+ "github.com/samber/lo"
+ "go.uber.org/zap"
+)
+
+func (c *Component) GetNetworkActivities(ctx echo.Context) (err error) {
+ var request NetworkActivitiesRequest
+
+ if err := ctx.Bind(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if request.Type, err = c.parseTypes(ctx.QueryParams()["type"], request.Tag); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err := defaults.Set(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err := ctx.Validate(&request); err != nil {
+ return response.ValidationFailedError(ctx, err)
+ }
+
+ go c.CollectTrace(ctx.Request().Context(), ctx.Request().RequestURI, request.Network.String())
+
+ go c.CollectMetric(ctx.Request().Context(), ctx.Request().RequestURI, request.Network.String())
+
+ addRecentRequest(ctx.Request().RequestURI)
+
+ cursor, err := c.getCursor(ctx.Request().Context(), request.Cursor)
+ if err != nil {
+ zap.L().Error("getCursor InternalError", zap.Error(err))
+
+ return response.InternalError(ctx)
+ }
+
+ databaseRequest := model.ActivitiesQuery{
+ Cursor: cursor,
+ StartTimestamp: request.SinceTimestamp,
+ EndTimestamp: request.UntilTimestamp,
+ Limit: request.Limit,
+ ActionLimit: request.ActionLimit,
+ Status: request.Status,
+ Direction: request.Direction,
+ Network: []network.Network{request.Network},
+ Tags: lo.Uniq(request.Tag),
+ Types: lo.Uniq(request.Type),
+ Platforms: lo.Uniq(request.Platform),
+ }
+
+ activities, last, err := c.getActivities(ctx.Request().Context(), databaseRequest)
+ if err != nil {
+ zap.L().Error("getActivities InternalError", zap.Error(err))
+
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, ActivitiesResponse{
+ Data: c.TransformActivities(ctx.Request().Context(), activities),
+ Meta: lo.Ternary(len(activities) < databaseRequest.Limit, nil, &MetaCursor{
+ Cursor: last,
+ }),
+ })
+}
+
+type NetworkActivitiesRequest struct {
+ Network network.Network `param:"network" validate:"required"`
+
+ Limit int `query:"limit" validate:"min=1,max=100" default:"100"`
+ ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"`
+ Cursor *string `query:"cursor"`
+ SinceTimestamp *uint64 `query:"since_timestamp"`
+ UntilTimestamp *uint64 `query:"until_timestamp"`
+ Status *bool `query:"success"`
+ Direction *activityx.Direction `query:"direction"`
+ Tag []tag.Tag `query:"tag"`
+ Type []schema.Type `query:"-"`
+ Platform []decentralized.Platform `query:"platform"`
+}
diff --git a/internal/node/component/federated/handler_platform_activity.go b/internal/node/component/federated/handler_platform_activity.go
new file mode 100644
index 00000000..84ac7bf4
--- /dev/null
+++ b/internal/node/component/federated/handler_platform_activity.go
@@ -0,0 +1,93 @@
+package federated
+
+import (
+ "net/http"
+
+ "github.com/creasty/defaults"
+ "github.com/labstack/echo/v4"
+ "github.com/rss3-network/node/common/http/response"
+ "github.com/rss3-network/node/internal/database/model"
+ "github.com/rss3-network/node/schema/worker/decentralized"
+ "github.com/rss3-network/protocol-go/schema"
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/tag"
+ "github.com/samber/lo"
+ "go.uber.org/zap"
+)
+
+func (c *Component) GetPlatformActivities(ctx echo.Context) (err error) {
+ var request PlatformActivitiesRequest
+
+ if err := ctx.Bind(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if request.Type, err = c.parseTypes(ctx.QueryParams()["type"], request.Tag); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err := defaults.Set(&request); err != nil {
+ return response.BadRequestError(ctx, err)
+ }
+
+ if err := ctx.Validate(&request); err != nil {
+ return response.ValidationFailedError(ctx, err)
+ }
+
+ go c.CollectTrace(ctx.Request().Context(), ctx.Request().RequestURI, request.Platform.String())
+
+ go c.CollectMetric(ctx.Request().Context(), ctx.Request().RequestURI, request.Platform.String())
+
+ addRecentRequest(ctx.Request().RequestURI)
+
+ cursor, err := c.getCursor(ctx.Request().Context(), request.Cursor)
+ if err != nil {
+ zap.L().Error("getCursor InternalError", zap.Error(err))
+
+ return response.InternalError(ctx)
+ }
+
+ databaseRequest := model.ActivitiesQuery{
+ Cursor: cursor,
+ StartTimestamp: request.SinceTimestamp,
+ EndTimestamp: request.UntilTimestamp,
+ Limit: request.Limit,
+ ActionLimit: request.ActionLimit,
+ Status: request.Status,
+ Direction: request.Direction,
+ Network: lo.Uniq(request.Network),
+ Tags: lo.Uniq(request.Tag),
+ Types: lo.Uniq(request.Type),
+ Platforms: []decentralized.Platform{request.Platform},
+ }
+
+ activities, last, err := c.getActivities(ctx.Request().Context(), databaseRequest)
+ if err != nil {
+ zap.L().Error("getActivities InternalError", zap.Error(err))
+
+ return response.InternalError(ctx)
+ }
+
+ return ctx.JSON(http.StatusOK, ActivitiesResponse{
+ Data: c.TransformActivities(ctx.Request().Context(), activities),
+ Meta: lo.Ternary(len(activities) < databaseRequest.Limit, nil, &MetaCursor{
+ Cursor: last,
+ }),
+ })
+}
+
+type PlatformActivitiesRequest struct {
+ Platform decentralized.Platform `param:"platform" validate:"required"`
+
+ Limit int `query:"limit" validate:"min=1,max=100" default:"100"`
+ ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"`
+ Cursor *string `query:"cursor"`
+ SinceTimestamp *uint64 `query:"since_timestamp"`
+ UntilTimestamp *uint64 `query:"until_timestamp"`
+ Status *bool `query:"success"`
+ Direction *activityx.Direction `query:"direction"`
+ Tag []tag.Tag `query:"tag"`
+ Type []schema.Type `query:"-"`
+ Network []network.Network `query:"network"`
+}
diff --git a/internal/node/component/federated/transformer_activity.go b/internal/node/component/federated/transformer_activity.go
new file mode 100644
index 00000000..07646453
--- /dev/null
+++ b/internal/node/component/federated/transformer_activity.go
@@ -0,0 +1,37 @@
+package federated
+
+import (
+ "context"
+
+ activityx "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/tag"
+ lop "github.com/samber/lo/parallel"
+ "go.uber.org/zap"
+)
+
+// TransformActivity should add related URLs to the activity based on action tag, network and platform
+func (c *Component) TransformActivity(ctx context.Context, activity *activityx.Activity) (*activityx.Activity, error) {
+ if activity == nil {
+ return nil, nil
+ }
+
+ // iterate over actions and transform them based on tag, network and platform
+ lop.ForEach(activity.Actions, func(actionPtr *activityx.Action, index int) {
+ action := *actionPtr
+
+ var err error
+
+ switch action.Tag {
+ case tag.Social:
+ *activity.Actions[index], err = c.TransformSocialType(ctx, activity.Network, activity.Platform, *actionPtr)
+ default:
+ activity.Actions[index] = actionPtr
+ }
+
+ if err != nil {
+ zap.L().Error("failed to transform action", zap.Error(err), zap.String("id", activity.ID))
+ }
+ })
+
+ return activity, nil
+}
diff --git a/internal/node/component/federated/transformer_social_type.go b/internal/node/component/federated/transformer_social_type.go
new file mode 100644
index 00000000..09a80278
--- /dev/null
+++ b/internal/node/component/federated/transformer_social_type.go
@@ -0,0 +1,69 @@
+package federated
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/rss3-network/protocol-go/schema/activity"
+ "github.com/rss3-network/protocol-go/schema/metadata"
+ "github.com/rss3-network/protocol-go/schema/network"
+ "github.com/rss3-network/protocol-go/schema/typex"
+ "github.com/samber/lo"
+ "go.uber.org/zap"
+)
+
+// TransformSocialType adds author url and note url to social actions based on type, network and platform
+func (c *Component) TransformSocialType(ctx context.Context, network network.Network, platform string, action activity.Action) (activity.Action, error) {
+ switch action.Type {
+ case typex.SocialPost, typex.SocialComment, typex.SocialShare, typex.SocialRevise, typex.SocialMint, typex.SocialDelete:
+ return c.TransformSocialPost(ctx, network, platform, action)
+ case typex.SocialProfile:
+ return c.TransformSocialProfile(ctx, platform, action)
+ case typex.SocialProxy:
+ return c.TransformSocialProxy(ctx, platform, action)
+ }
+
+ return action, nil
+}
+
+// TransformSocialPost adds author url and note url to social post action
+func (c *Component) TransformSocialPost(_ context.Context, _ network.Network, _ string, action activity.Action) (activity.Action, error) {
+ post, ok := action.Metadata.(*metadata.SocialPost)
+ if !ok {
+ zap.L().Error("invalid post metadata type", zap.Any("metadata", action.Metadata))
+
+ return activity.Action{}, fmt.Errorf("invalid post metadata type: %T", action.Metadata)
+ }
+
+ if lo.IsEmpty(post.Handle) {
+ post.Handle = action.From
+ }
+
+ action.Metadata = post
+
+ return action, nil
+}
+
+// TransformSocialProfile adds author url to social profile action
+func (c *Component) TransformSocialProfile(_ context.Context, _ string, action activity.Action) (activity.Action, error) {
+ _, ok := action.Metadata.(*metadata.SocialProfile)
+ if !ok {
+ zap.L().Error("invalid profile metadata type", zap.Any("metadata", action.Metadata))
+
+ return activity.Action{}, fmt.Errorf("invalid profile metadata type: %T", action.Metadata)
+ }
+
+ return action, nil
+}
+
+// TransformSocialProxy adds author url to social proxy action
+func (c *Component) TransformSocialProxy(_ context.Context, _ string, action activity.Action) (activity.Action, error) {
+ _, ok := action.Metadata.(*metadata.SocialProxy)
+ if !ok {
+ zap.L().Error("invalid proxy metadata type", zap.Any("metadata", action.Metadata))
+
+ return activity.Action{}, fmt.Errorf("invalid proxy metadata type: %T", action.Metadata)
+ }
+
+ return action, nil
+}
diff --git a/internal/node/component/info/handler_network.go b/internal/node/component/info/handler_network.go
index 82ded5c1..7fea2ba1 100644
--- a/internal/node/component/info/handler_network.go
+++ b/internal/node/component/info/handler_network.go
@@ -109,6 +109,8 @@ func (c *Component) GetWorkerConfig(ctx echo.Context) error {
var wid worker.Worker
switch nid.Source() {
+ case network.ActivityPubSource:
+ wid, err = decentralized.WorkerString(request.Worker) // make this case be seperated from the last one for reminder
case network.RSSSource:
wid, err = rss.WorkerString(request.Worker)
case network.EthereumSource, network.ArweaveSource, network.FarcasterSource, network.NearSource:
diff --git a/internal/node/component/info/network_config.go b/internal/node/component/info/network_config.go
index ab69f5c9..0c1bfa15 100644
--- a/internal/node/component/info/network_config.go
+++ b/internal/node/component/info/network_config.go
@@ -48,6 +48,7 @@ type Parameters struct {
BlockReceiptBatchSize *ConfigDetail `json:"block_receipts_batch_size,omitempty"`
APIKey *ConfigDetail `json:"api_key,omitempty"`
Authentication *Authentication `json:"authentication,omitempty"`
+ MastodonKafkaTopic *ConfigDetail `json:"mastodon_kafka_topic,omitempty"`
}
type workerConfig struct {
@@ -349,6 +350,9 @@ var NetworkToWorkersMap = map[network.Network][]worker.Worker{
decentralized.Core,
decentralized.LiNEAR,
},
+ network.Mastodon: {
+ decentralized.Mastodon,
+ },
}
// WorkerToConfigMap is a map of worker to config.
@@ -393,6 +397,9 @@ var WorkerToConfigMap = map[network.Source]map[worker.Worker]workerConfig{
decentralized.Core: defaultWorkerConfig(decentralized.Core, network.NearSource, nil),
decentralized.LiNEAR: defaultWorkerConfig(decentralized.LiNEAR, network.NearSource, nil),
},
+ network.ActivityPubSource: {
+ decentralized.Mastodon: defaultWorkerConfig(decentralized.Mastodon, network.ActivityPubSource, nil),
+ },
network.FarcasterSource: {
decentralized.Core: customWorkerConfig(decentralized.Core, network.FarcasterSource, &Parameters{
APIKey: &ConfigDetail{
diff --git a/internal/node/indexer/server.go b/internal/node/indexer/server.go
index 20420c0f..397b4849 100644
--- a/internal/node/indexer/server.go
+++ b/internal/node/indexer/server.go
@@ -14,7 +14,9 @@ import (
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/internal/engine/source"
- worker "github.com/rss3-network/node/internal/engine/worker/decentralized"
+ decentralizedWorker "github.com/rss3-network/node/internal/engine/worker/decentralized"
+ // worker "github.com/rss3-network/node/internal/engine/worker/decentralized"
+ federatedWorker "github.com/rss3-network/node/internal/engine/worker/federated"
"github.com/rss3-network/node/internal/node/monitor"
"github.com/rss3-network/node/internal/stream"
decentralizedx "github.com/rss3-network/node/schema/worker/decentralized"
@@ -282,11 +284,25 @@ func NewServer(ctx context.Context, config *config.Module, databaseClient databa
}
// Initialize worker.
- if instance.worker, err = worker.New(instance.config, databaseClient, instance.redisClient); err != nil {
- return nil, fmt.Errorf("new worker: %w", err)
+ switch config.Network.Source() {
+ case network.ArweaveSource, network.EthereumSource, network.FarcasterSource, network.RSSSource:
+ if instance.worker, err = decentralizedWorker.New(instance.config, databaseClient, instance.redisClient); err != nil {
+ return nil, fmt.Errorf("new decentralized worker: %w", err)
+ }
+ case network.ActivityPubSource:
+ if instance.worker, err = federatedWorker.New(instance.config, databaseClient, instance.redisClient); err != nil {
+ return nil, fmt.Errorf("new federated worker: %w", err)
+ }
+ default:
+ return nil, fmt.Errorf("unknown worker source: %s", config.Network.Source())
}
switch config.Network.Source() {
+ case network.ActivityPubSource:
+ instance.monitorClient, err = monitor.NewActivityPubClient(config.EndpointID, config.Parameters)
+ if err != nil {
+ return nil, fmt.Errorf("error occurred in creating new activitypub monitorClient: %w", err)
+ }
case network.ArweaveSource:
instance.monitorClient, err = monitor.NewArweaveClient()
if err != nil {
diff --git a/internal/node/monitor/client.go b/internal/node/monitor/client.go
index a08baa53..ecad2186 100644
--- a/internal/node/monitor/client.go
+++ b/internal/node/monitor/client.go
@@ -10,6 +10,7 @@ import (
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/node/component/rss"
+ "github.com/rss3-network/node/provider/activitypub/mastodon"
"github.com/rss3-network/node/provider/arweave"
"github.com/rss3-network/node/provider/ethereum"
"github.com/rss3-network/node/provider/farcaster"
@@ -26,6 +27,14 @@ type Client interface {
LatestState(ctx context.Context) (uint64, uint64, error)
}
+// activitypubClient is a client implementation for ActivityPub.
+type activitypubClient struct {
+ activitypubClient mastodon.Client // ToDo: (Note) Currently use Matodon Client for all ActivityPub Source.
+}
+
+// set a default client
+var _ Client = (*activitypubClient)(nil)
+
// ethereumClient is a client implementation for ethereum.
type ethereumClient struct {
ethereumClient ethereum.Client
@@ -258,3 +267,49 @@ func NewRssClient(endpoint string, param *config.Parameters) (Client, error) {
url: base.String(),
}, nil
}
+
+func (c *activitypubClient) CurrentState(_ CheckpointState) (uint64, uint64) {
+ return 0, 0
+}
+
+func (c *activitypubClient) TargetState(_ *config.Parameters) (uint64, uint64) {
+ return 0, 0
+}
+
+// LatestState returns the latest state of the Kafka consuming process
+func (c *activitypubClient) LatestState(ctx context.Context) (uint64, uint64, error) {
+ // Poll the Kafka consumer to verify its working state
+ fetches := c.activitypubClient.GetKafkaConsumer().PollFetches(ctx)
+ if errs := fetches.Errors(); len(errs) > 0 {
+ for _, e := range errs {
+ fmt.Printf("consumer poll fetch error: %v\n", e.Err)
+ }
+
+ return 0, 0, fmt.Errorf("consumer poll fetch error: %v", fetches.Errors())
+ }
+ // If no errors, assume the service is healthy
+ return 0, 0, nil
+}
+
+// NewActivityPubClient returns a new ActivityPub client.
+func NewActivityPubClient(endpoint string, param *config.Parameters) (Client, error) {
+ base, err := url.Parse(endpoint)
+ if err != nil {
+ return nil, fmt.Errorf("parse ActivityPub endpoint: %w", err)
+ }
+
+ // Retrieve kafkaTopic from the parameters
+ kafkaTopic := (*param)["mastodon_kafka_topic"].(string)
+
+ base.Path = path.Join(base.Path, kafkaTopic)
+
+ // Create a new activitypub(mastodon) client
+ mastodonClient, err := mastodon.NewClient(endpoint, kafkaTopic)
+ if err != nil {
+ return nil, fmt.Errorf("create ActivityPub client: %w", err)
+ }
+
+ return &activitypubClient{
+ activitypubClient: mastodonClient,
+ }, nil
+}
diff --git a/internal/node/node.go b/internal/node/node.go
index 80cf772c..97248414 100644
--- a/internal/node/node.go
+++ b/internal/node/node.go
@@ -17,6 +17,7 @@ import (
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/node/component"
"github.com/rss3-network/node/internal/node/component/decentralized"
+ "github.com/rss3-network/node/internal/node/component/federated"
"github.com/rss3-network/node/internal/node/component/info"
"github.com/rss3-network/node/internal/node/component/rss"
"github.com/rss3-network/node/internal/node/middlewarex"
@@ -83,6 +84,11 @@ func NewCoreService(ctx context.Context, config *config.File, databaseClient dat
node.components = append(node.components, &decentralizedComponent)
}
+ if len(config.Component.Federated) > 0 {
+ federatedComponent := federated.NewComponent(ctx, apiServer, config, databaseClient, redisClient)
+ node.components = append(node.components, &federatedComponent)
+ }
+
// Generate openapi.json
var endpoint string
if config.Discovery != nil && config.Discovery.Server != nil {
diff --git a/provider/activitypub/mastodon/client.go b/provider/activitypub/mastodon/client.go
new file mode 100644
index 00000000..951d8075
--- /dev/null
+++ b/provider/activitypub/mastodon/client.go
@@ -0,0 +1,62 @@
+package mastodon
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/go-playground/form/v4"
+ "github.com/twmb/franz-go/pkg/kgo"
+)
+
+const (
+ DefaultTimeout = 3 * time.Second
+ DefaultAttempts = 2
+)
+
+var _ Client = (*client)(nil)
+
+type Client interface {
+ GetKafkaConsumer() *kgo.Client
+}
+
+type client struct {
+ httpClient *http.Client
+ encoder *form.Encoder
+ attempts uint
+ kafkaConsumer *kgo.Client
+}
+
+func (c *client) GetKafkaConsumer() *kgo.Client {
+ return c.kafkaConsumer
+}
+
+func NewClient(endpoint string, kafkaTopic string) (Client, error) {
+ var (
+ instance = client{
+ httpClient: &http.Client{
+ Timeout: DefaultTimeout,
+ },
+ encoder: form.NewEncoder(),
+ attempts: DefaultAttempts,
+ }
+ )
+ // form the kafka broker endpoint
+ var kafkaBrokers = []string{endpoint}
+
+ // Create a new Kafka client
+ options := []kgo.Opt{
+ kgo.SeedBrokers(kafkaBrokers...),
+ kgo.ConsumeTopics(kafkaTopic),
+ kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
+ }
+ consumer, err := kgo.NewClient(options...)
+
+ if err != nil {
+ return nil, fmt.Errorf("create kafka consumer: %w", err)
+ }
+
+ instance.kafkaConsumer = consumer
+
+ return &instance, nil
+}
diff --git a/provider/activitypub/mastodon/type.go b/provider/activitypub/mastodon/type.go
new file mode 100644
index 00000000..43f6edb9
--- /dev/null
+++ b/provider/activitypub/mastodon/type.go
@@ -0,0 +1,24 @@
+package mastodon
+
+//go:generate go run --mod=mod github.com/dmarkham/enumer --values --type=MessageType --output type_message.go --trimprefix=MessageType
+type MessageType int
+
+const (
+ MessageTypeNone MessageType = iota // Invalid default value
+ MessageTypeCreate // Create ActivityPub message
+ MessageTypeAnnounce // Announce ActivityPub message
+ MessageTypeLike // Like ActivityPub message
+)
+
+const (
+ // ActivityPub object types
+ ObjectTypeNote = "Note"
+
+ InReplyTo = "inReplyTo"
+ To = "to"
+ Type = "type"
+
+ // ActivityPub tag types
+ TagTypeMention = "Mention"
+ TagTypeHashtag = "Hashtag"
+)
diff --git a/provider/activitypub/mastodon/type_message.go b/provider/activitypub/mastodon/type_message.go
new file mode 100644
index 00000000..eb7400fe
--- /dev/null
+++ b/provider/activitypub/mastodon/type_message.go
@@ -0,0 +1,90 @@
+// Code generated by "enumer --values --type=MessageType --output type_message.go --trimprefix=MessageType"; DO NOT EDIT.
+
+package mastodon
+
+import (
+ "fmt"
+ "strings"
+)
+
+const _MessageTypeName = "NoneCreateAnnounceLike"
+
+var _MessageTypeIndex = [...]uint8{0, 4, 10, 18, 22}
+
+const _MessageTypeLowerName = "nonecreateannouncelike"
+
+func (i MessageType) String() string {
+ if i < 0 || i >= MessageType(len(_MessageTypeIndex)-1) {
+ return fmt.Sprintf("MessageType(%d)", i)
+ }
+ return _MessageTypeName[_MessageTypeIndex[i]:_MessageTypeIndex[i+1]]
+}
+
+func (MessageType) Values() []string {
+ return MessageTypeStrings()
+}
+
+// An "invalid array index" compiler error signifies that the constant values have changed.
+// Re-run the stringer command to generate them again.
+func _MessageTypeNoOp() {
+ var x [1]struct{}
+ _ = x[MessageTypeNone-(0)]
+ _ = x[MessageTypeCreate-(1)]
+ _ = x[MessageTypeAnnounce-(2)]
+ _ = x[MessageTypeLike-(3)]
+}
+
+var _MessageTypeValues = []MessageType{MessageTypeNone, MessageTypeCreate, MessageTypeAnnounce, MessageTypeLike}
+
+var _MessageTypeNameToValueMap = map[string]MessageType{
+ _MessageTypeName[0:4]: MessageTypeNone,
+ _MessageTypeLowerName[0:4]: MessageTypeNone,
+ _MessageTypeName[4:10]: MessageTypeCreate,
+ _MessageTypeLowerName[4:10]: MessageTypeCreate,
+ _MessageTypeName[10:18]: MessageTypeAnnounce,
+ _MessageTypeLowerName[10:18]: MessageTypeAnnounce,
+ _MessageTypeName[18:22]: MessageTypeLike,
+ _MessageTypeLowerName[18:22]: MessageTypeLike,
+}
+
+var _MessageTypeNames = []string{
+ _MessageTypeName[0:4],
+ _MessageTypeName[4:10],
+ _MessageTypeName[10:18],
+ _MessageTypeName[18:22],
+}
+
+// MessageTypeString retrieves an enum value from the enum constants string name.
+// Throws an error if the param is not part of the enum.
+func MessageTypeString(s string) (MessageType, error) {
+ if val, ok := _MessageTypeNameToValueMap[s]; ok {
+ return val, nil
+ }
+
+ if val, ok := _MessageTypeNameToValueMap[strings.ToLower(s)]; ok {
+ return val, nil
+ }
+ return 0, fmt.Errorf("%s does not belong to MessageType values", s)
+}
+
+// MessageTypeValues returns all values of the enum
+func MessageTypeValues() []MessageType {
+ return _MessageTypeValues
+}
+
+// MessageTypeStrings returns a slice of all String values of the enum
+func MessageTypeStrings() []string {
+ strs := make([]string, len(_MessageTypeNames))
+ copy(strs, _MessageTypeNames)
+ return strs
+}
+
+// IsAMessageType returns "true" if the value is listed in the enum definition. "false" otherwise
+func (i MessageType) IsAMessageType() bool {
+ for _, v := range _MessageTypeValues {
+ if i == v {
+ return true
+ }
+ }
+ return false
+}
diff --git a/provider/activitypub/model.go b/provider/activitypub/model.go
new file mode 100644
index 00000000..b1cec6ce
--- /dev/null
+++ b/provider/activitypub/model.go
@@ -0,0 +1,46 @@
+package activitypub
+
+// Object represents a general ActivityPub object or activity.
+type Object struct {
+ Context interface{} `json:"@context,omitempty"`
+ ID string `json:"id"`
+ Type string `json:"type"`
+ Actor string `json:"actor,omitempty"`
+ Object interface{} `json:"object,omitempty"`
+ Target interface{} `json:"target,omitempty"`
+ Result interface{} `json:"result,omitempty"`
+ Origin interface{} `json:"origin,omitempty"`
+ Instrument interface{} `json:"instrument,omitempty"`
+ Published string `json:"published,omitempty"`
+ To []string `json:"to,omitempty"`
+ CC []string `json:"cc,omitempty"`
+ Bto []string `json:"bto,omitempty"`
+ Bcc []string `json:"bcc,omitempty"`
+ Attachment []Attachment `json:"attachment,omitempty"`
+ Tag []Tag `json:"tag,omitempty"`
+ Attributes map[string]interface{} `json:"attributes,omitempty"`
+}
+
+// Attachment represents an attachment to an ActivityPub object.
+type Attachment struct {
+ Type string `json:"type"`
+ URL string `json:"url"`
+}
+
+// Tag represents a tag in an ActivityPub object.
+type Tag struct {
+ Type string `json:"type"`
+ Name string `json:"name"`
+ Href string `json:"href"`
+}
+
+// Note represents a note object in ActivityPub.
+type Note struct {
+ ID string `json:"id"`
+ Type string `json:"type"`
+ Content string `json:"content"`
+ Published string `json:"published,omitempty"`
+ To []string `json:"to,omitempty"`
+ CC []string `json:"cc,omitempty"`
+ Tag []Tag `json:"tag,omitempty"`
+}
diff --git a/provider/redis/handle_updates.go b/provider/redis/handle_updates.go
new file mode 100644
index 00000000..44f10851
--- /dev/null
+++ b/provider/redis/handle_updates.go
@@ -0,0 +1,75 @@
+package redis
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/redis/rueidis"
+ "go.uber.org/zap"
+)
+
+const handleUpdatesKey = "handle_updates"
+
+// GetAllHandles retrieves all handles from the sorted set without filtering by score
+func GetAllHandles(ctx context.Context, client rueidis.Client) ([]string, error) {
+ // Check if the key exists
+ existsCmd := client.B().Exists().Key(handleUpdatesKey).Build()
+ exists, err := client.Do(ctx, existsCmd).AsInt64()
+
+ if err != nil {
+ zap.L().Error("failed to check if key exists", zap.Error(err))
+ return nil, fmt.Errorf("failed to check if key exists: %w", err)
+ }
+
+ if exists == 0 {
+ zap.L().Warn("handle updates key does not exist")
+ return []string{}, nil
+ }
+
+ // Get all members without scores
+ cmd := client.B().Zrange().Key(handleUpdatesKey).Min("0").Max("-1").Build()
+ result, err := client.Do(ctx, cmd).AsStrSlice()
+
+ if err != nil {
+ zap.L().Error("failed to get handles", zap.Error(err))
+ return nil, fmt.Errorf("failed to get handles: %w", err)
+ }
+
+ return result, nil
+}
+
+// AddHandleUpdate adds a handle to the sorted set with the current timestamp as score
+func AddHandleUpdate(ctx context.Context, client rueidis.Client, handle string) error {
+ fmt.Println("Reached AddHandleUpdate!")
+
+ err := client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error()
+ if err != nil {
+ fmt.Println("Failed to connect to Redis")
+ } else {
+ fmt.Println("Connected to Redis successfully")
+ }
+
+ cmd := client.B().Zadd().Key(handleUpdatesKey).ScoreMember().ScoreMember(float64(time.Now().Unix()), handle).Build()
+
+ return client.Do(ctx, cmd).Error()
+}
+
+// GetRecentHandleUpdates retrieves handles updated since the given timestamp
+func GetRecentHandleUpdates(ctx context.Context, client rueidis.Client, since uint64) ([]string, error) {
+ cmd := client.B().Zrangebyscore().Key(handleUpdatesKey).Min(fmt.Sprintf("%d", since)).Max("+inf").Build()
+ result, err := client.Do(ctx, cmd).AsStrSlice()
+
+ if err != nil {
+ zap.L().Error("failed to get recent handle updates", zap.Uint64("since", since), zap.Error(err))
+ return nil, fmt.Errorf("failed to get recent handle updates: %w", err)
+ }
+
+ return result, nil
+}
+
+// RemoveOldHandleUpdates removes handles updated before the given time
+func RemoveOldHandleUpdates(ctx context.Context, client rueidis.Client, before time.Time) error {
+ cmd := client.B().Zremrangebyscore().Key(handleUpdatesKey).Min("-inf").Max(fmt.Sprintf("%d", before.Unix())).Build()
+ return client.Do(ctx, cmd).Error()
+}
diff --git a/schema/worker/decentralized/platform.go b/schema/worker/decentralized/platform.go
index 90a97fe5..17deaf0d 100644
--- a/schema/worker/decentralized/platform.go
+++ b/schema/worker/decentralized/platform.go
@@ -26,6 +26,7 @@ const (
PlatformLinea // Linear
PlatformLiNEAR // LiNEAR
PlatformLooksRare // LooksRare
+ PlatformMastodon // Mastodon
PlatformMatters // Matters
PlatformMirror // Mirror
PlatformNouns // Nouns
@@ -73,6 +74,7 @@ var ToPlatformMap = map[Worker]Platform{
Linea: PlatformLinea,
LiNEAR: PlatformLiNEAR,
Looksrare: PlatformLooksRare,
+ Mastodon: PlatformMastodon,
Matters: PlatformMatters,
Mirror: PlatformMirror,
Momoka: PlatformLens,
diff --git a/schema/worker/decentralized/platform_string.go b/schema/worker/decentralized/platform_string.go
index b583fbd9..ad7e979c 100644
--- a/schema/worker/decentralized/platform_string.go
+++ b/schema/worker/decentralized/platform_string.go
@@ -9,11 +9,11 @@ import (
"strings"
)
-const _PlatformName = "Unknown1inchAAVEAavegotchiArbitrumBaseBendDAOCowCrossbellCurveENSFarcasterHighlightIQWikiKiwiStandLensLidoLinearLiNEARLooksRareMattersMirrorNounsOpenSeaOptimismParagraphParaswapPolymarketRSS3SAVMStargateUniswapVSL"
+const _PlatformName = "Unknown1inchAAVEAavegotchiArbitrumBaseBendDAOCowCrossbellCurveENSFarcasterHighlightIQWikiKiwiStandLensLidoLinearLiNEARLooksRareMastodonMattersMirrorNounsOpenSeaOptimismParagraphParaswapPolymarketRSS3SAVMStargateUniswapVSL"
-var _PlatformIndex = [...]uint8{0, 7, 12, 16, 26, 34, 38, 45, 48, 57, 62, 65, 74, 83, 89, 98, 102, 106, 112, 118, 127, 134, 140, 145, 152, 160, 169, 177, 187, 191, 195, 203, 210, 213}
+var _PlatformIndex = [...]uint8{0, 7, 12, 16, 26, 34, 38, 45, 48, 57, 62, 65, 74, 83, 89, 98, 102, 106, 112, 118, 127, 135, 142, 148, 153, 160, 168, 177, 185, 195, 199, 203, 211, 218, 221}
-const _PlatformLowerName = "unknown1inchaaveaavegotchiarbitrumbasebenddaocowcrossbellcurveensfarcasterhighlightiqwikikiwistandlenslidolinearlinearlooksraremattersmirrornounsopenseaoptimismparagraphparaswappolymarketrss3savmstargateuniswapvsl"
+const _PlatformLowerName = "unknown1inchaaveaavegotchiarbitrumbasebenddaocowcrossbellcurveensfarcasterhighlightiqwikikiwistandlenslidolinearlinearlooksraremastodonmattersmirrornounsopenseaoptimismparagraphparaswappolymarketrss3savmstargateuniswapvsl"
func (i Platform) String() string {
if i >= Platform(len(_PlatformIndex)-1) {
@@ -50,22 +50,23 @@ func _PlatformNoOp() {
_ = x[PlatformLinea-(17)]
_ = x[PlatformLiNEAR-(18)]
_ = x[PlatformLooksRare-(19)]
- _ = x[PlatformMatters-(20)]
- _ = x[PlatformMirror-(21)]
- _ = x[PlatformNouns-(22)]
- _ = x[PlatformOpenSea-(23)]
- _ = x[PlatformOptimism-(24)]
- _ = x[PlatformParagraph-(25)]
- _ = x[PlatformParaswap-(26)]
- _ = x[PlatformPolymarket-(27)]
- _ = x[PlatformRSS3-(28)]
- _ = x[PlatformSAVM-(29)]
- _ = x[PlatformStargate-(30)]
- _ = x[PlatformUniswap-(31)]
- _ = x[PlatformVSL-(32)]
+ _ = x[PlatformMastodon-(20)]
+ _ = x[PlatformMatters-(21)]
+ _ = x[PlatformMirror-(22)]
+ _ = x[PlatformNouns-(23)]
+ _ = x[PlatformOpenSea-(24)]
+ _ = x[PlatformOptimism-(25)]
+ _ = x[PlatformParagraph-(26)]
+ _ = x[PlatformParaswap-(27)]
+ _ = x[PlatformPolymarket-(28)]
+ _ = x[PlatformRSS3-(29)]
+ _ = x[PlatformSAVM-(30)]
+ _ = x[PlatformStargate-(31)]
+ _ = x[PlatformUniswap-(32)]
+ _ = x[PlatformVSL-(33)]
}
-var _PlatformValues = []Platform{PlatformUnknown, Platform1Inch, PlatformAAVE, PlatformAavegotchi, PlatformArbitrum, PlatformBase, PlatformBendDAO, PlatformCow, PlatformCrossbell, PlatformCurve, PlatformENS, PlatformFarcaster, PlatformHighlight, PlatformIQWiki, PlatformKiwiStand, PlatformLens, PlatformLido, PlatformLinea, PlatformLiNEAR, PlatformLooksRare, PlatformMatters, PlatformMirror, PlatformNouns, PlatformOpenSea, PlatformOptimism, PlatformParagraph, PlatformParaswap, PlatformPolymarket, PlatformRSS3, PlatformSAVM, PlatformStargate, PlatformUniswap, PlatformVSL}
+var _PlatformValues = []Platform{PlatformUnknown, Platform1Inch, PlatformAAVE, PlatformAavegotchi, PlatformArbitrum, PlatformBase, PlatformBendDAO, PlatformCow, PlatformCrossbell, PlatformCurve, PlatformENS, PlatformFarcaster, PlatformHighlight, PlatformIQWiki, PlatformKiwiStand, PlatformLens, PlatformLido, PlatformLinea, PlatformLiNEAR, PlatformLooksRare, PlatformMastodon, PlatformMatters, PlatformMirror, PlatformNouns, PlatformOpenSea, PlatformOptimism, PlatformParagraph, PlatformParaswap, PlatformPolymarket, PlatformRSS3, PlatformSAVM, PlatformStargate, PlatformUniswap, PlatformVSL}
var _PlatformNameToValueMap = map[string]Platform{
_PlatformName[0:7]: PlatformUnknown,
@@ -108,32 +109,34 @@ var _PlatformNameToValueMap = map[string]Platform{
_PlatformLowerName[112:118]: PlatformLiNEAR,
_PlatformName[118:127]: PlatformLooksRare,
_PlatformLowerName[118:127]: PlatformLooksRare,
- _PlatformName[127:134]: PlatformMatters,
- _PlatformLowerName[127:134]: PlatformMatters,
- _PlatformName[134:140]: PlatformMirror,
- _PlatformLowerName[134:140]: PlatformMirror,
- _PlatformName[140:145]: PlatformNouns,
- _PlatformLowerName[140:145]: PlatformNouns,
- _PlatformName[145:152]: PlatformOpenSea,
- _PlatformLowerName[145:152]: PlatformOpenSea,
- _PlatformName[152:160]: PlatformOptimism,
- _PlatformLowerName[152:160]: PlatformOptimism,
- _PlatformName[160:169]: PlatformParagraph,
- _PlatformLowerName[160:169]: PlatformParagraph,
- _PlatformName[169:177]: PlatformParaswap,
- _PlatformLowerName[169:177]: PlatformParaswap,
- _PlatformName[177:187]: PlatformPolymarket,
- _PlatformLowerName[177:187]: PlatformPolymarket,
- _PlatformName[187:191]: PlatformRSS3,
- _PlatformLowerName[187:191]: PlatformRSS3,
- _PlatformName[191:195]: PlatformSAVM,
- _PlatformLowerName[191:195]: PlatformSAVM,
- _PlatformName[195:203]: PlatformStargate,
- _PlatformLowerName[195:203]: PlatformStargate,
- _PlatformName[203:210]: PlatformUniswap,
- _PlatformLowerName[203:210]: PlatformUniswap,
- _PlatformName[210:213]: PlatformVSL,
- _PlatformLowerName[210:213]: PlatformVSL,
+ _PlatformName[127:135]: PlatformMastodon,
+ _PlatformLowerName[127:135]: PlatformMastodon,
+ _PlatformName[135:142]: PlatformMatters,
+ _PlatformLowerName[135:142]: PlatformMatters,
+ _PlatformName[142:148]: PlatformMirror,
+ _PlatformLowerName[142:148]: PlatformMirror,
+ _PlatformName[148:153]: PlatformNouns,
+ _PlatformLowerName[148:153]: PlatformNouns,
+ _PlatformName[153:160]: PlatformOpenSea,
+ _PlatformLowerName[153:160]: PlatformOpenSea,
+ _PlatformName[160:168]: PlatformOptimism,
+ _PlatformLowerName[160:168]: PlatformOptimism,
+ _PlatformName[168:177]: PlatformParagraph,
+ _PlatformLowerName[168:177]: PlatformParagraph,
+ _PlatformName[177:185]: PlatformParaswap,
+ _PlatformLowerName[177:185]: PlatformParaswap,
+ _PlatformName[185:195]: PlatformPolymarket,
+ _PlatformLowerName[185:195]: PlatformPolymarket,
+ _PlatformName[195:199]: PlatformRSS3,
+ _PlatformLowerName[195:199]: PlatformRSS3,
+ _PlatformName[199:203]: PlatformSAVM,
+ _PlatformLowerName[199:203]: PlatformSAVM,
+ _PlatformName[203:211]: PlatformStargate,
+ _PlatformLowerName[203:211]: PlatformStargate,
+ _PlatformName[211:218]: PlatformUniswap,
+ _PlatformLowerName[211:218]: PlatformUniswap,
+ _PlatformName[218:221]: PlatformVSL,
+ _PlatformLowerName[218:221]: PlatformVSL,
}
var _PlatformNames = []string{
@@ -157,19 +160,20 @@ var _PlatformNames = []string{
_PlatformName[106:112],
_PlatformName[112:118],
_PlatformName[118:127],
- _PlatformName[127:134],
- _PlatformName[134:140],
- _PlatformName[140:145],
- _PlatformName[145:152],
- _PlatformName[152:160],
- _PlatformName[160:169],
- _PlatformName[169:177],
- _PlatformName[177:187],
- _PlatformName[187:191],
- _PlatformName[191:195],
- _PlatformName[195:203],
- _PlatformName[203:210],
- _PlatformName[210:213],
+ _PlatformName[127:135],
+ _PlatformName[135:142],
+ _PlatformName[142:148],
+ _PlatformName[148:153],
+ _PlatformName[153:160],
+ _PlatformName[160:168],
+ _PlatformName[168:177],
+ _PlatformName[177:185],
+ _PlatformName[185:195],
+ _PlatformName[195:199],
+ _PlatformName[199:203],
+ _PlatformName[203:211],
+ _PlatformName[211:218],
+ _PlatformName[218:221],
}
// PlatformString retrieves an enum value from the enum constants string name.
diff --git a/schema/worker/decentralized/worker.go b/schema/worker/decentralized/worker.go
index e8055300..7a50dcc1 100644
--- a/schema/worker/decentralized/worker.go
+++ b/schema/worker/decentralized/worker.go
@@ -27,6 +27,7 @@ const (
Linea // linea
LiNEAR // linear
Looksrare // looksrare
+ Mastodon // mastodon
Matters // matters
Mirror // mirror
Momoka // momoka
@@ -87,6 +88,7 @@ var ToTagsMap = map[Worker][]tag.Tag{
Linea: {tag.Exchange, tag.Transaction},
LiNEAR: {tag.Exchange, tag.Transaction},
Looksrare: {tag.Collectible},
+ Mastodon: {tag.Social},
Matters: {tag.Social},
Mirror: {tag.Social},
Momoka: {tag.Social},
diff --git a/schema/worker/decentralized/worker_string.go b/schema/worker/decentralized/worker_string.go
index 56f01e01..72d863c9 100644
--- a/schema/worker/decentralized/worker_string.go
+++ b/schema/worker/decentralized/worker_string.go
@@ -9,11 +9,11 @@ import (
"strings"
)
-const _WorkerName = "aaveaavegotchiarbitrumbasebenddaocorecowcrossbellcurveenshighlightiqwikikiwistandlenslidolinealinearlooksraremattersmirrormomokanouns1inchopenseaoptimismparagraphparaswappolymarketrss3savmstargateuniswapvsl"
+const _WorkerName = "aaveaavegotchiarbitrumbasebenddaocorecowcrossbellcurveenshighlightiqwikikiwistandlenslidolinealinearlooksraremastodonmattersmirrormomokanouns1inchopenseaoptimismparagraphparaswappolymarketrss3savmstargateuniswapvsl"
-var _WorkerIndex = [...]uint8{0, 4, 14, 22, 26, 33, 37, 40, 49, 54, 57, 66, 72, 81, 85, 89, 94, 100, 109, 116, 122, 128, 133, 138, 145, 153, 162, 170, 180, 184, 188, 196, 203, 206}
+var _WorkerIndex = [...]uint8{0, 4, 14, 22, 26, 33, 37, 40, 49, 54, 57, 66, 72, 81, 85, 89, 94, 100, 109, 117, 124, 130, 136, 141, 146, 153, 161, 170, 178, 188, 192, 196, 204, 211, 214}
-const _WorkerLowerName = "aaveaavegotchiarbitrumbasebenddaocorecowcrossbellcurveenshighlightiqwikikiwistandlenslidolinealinearlooksraremattersmirrormomokanouns1inchopenseaoptimismparagraphparaswappolymarketrss3savmstargateuniswapvsl"
+const _WorkerLowerName = "aaveaavegotchiarbitrumbasebenddaocorecowcrossbellcurveenshighlightiqwikikiwistandlenslidolinealinearlooksraremastodonmattersmirrormomokanouns1inchopenseaoptimismparagraphparaswappolymarketrss3savmstargateuniswapvsl"
func (i Worker) String() string {
i -= 1
@@ -49,24 +49,25 @@ func _WorkerNoOp() {
_ = x[Linea-(16)]
_ = x[LiNEAR-(17)]
_ = x[Looksrare-(18)]
- _ = x[Matters-(19)]
- _ = x[Mirror-(20)]
- _ = x[Momoka-(21)]
- _ = x[Nouns-(22)]
- _ = x[Oneinch-(23)]
- _ = x[OpenSea-(24)]
- _ = x[Optimism-(25)]
- _ = x[Paragraph-(26)]
- _ = x[Paraswap-(27)]
- _ = x[Polymarket-(28)]
- _ = x[RSS3-(29)]
- _ = x[SAVM-(30)]
- _ = x[Stargate-(31)]
- _ = x[Uniswap-(32)]
- _ = x[VSL-(33)]
+ _ = x[Mastodon-(19)]
+ _ = x[Matters-(20)]
+ _ = x[Mirror-(21)]
+ _ = x[Momoka-(22)]
+ _ = x[Nouns-(23)]
+ _ = x[Oneinch-(24)]
+ _ = x[OpenSea-(25)]
+ _ = x[Optimism-(26)]
+ _ = x[Paragraph-(27)]
+ _ = x[Paraswap-(28)]
+ _ = x[Polymarket-(29)]
+ _ = x[RSS3-(30)]
+ _ = x[SAVM-(31)]
+ _ = x[Stargate-(32)]
+ _ = x[Uniswap-(33)]
+ _ = x[VSL-(34)]
}
-var _WorkerValues = []Worker{Aave, Aavegotchi, Arbitrum, Base, BendDAO, Core, Cow, Crossbell, Curve, ENS, Highlight, IQWiki, KiwiStand, Lens, Lido, Linea, LiNEAR, Looksrare, Matters, Mirror, Momoka, Nouns, Oneinch, OpenSea, Optimism, Paragraph, Paraswap, Polymarket, RSS3, SAVM, Stargate, Uniswap, VSL}
+var _WorkerValues = []Worker{Aave, Aavegotchi, Arbitrum, Base, BendDAO, Core, Cow, Crossbell, Curve, ENS, Highlight, IQWiki, KiwiStand, Lens, Lido, Linea, LiNEAR, Looksrare, Mastodon, Matters, Mirror, Momoka, Nouns, Oneinch, OpenSea, Optimism, Paragraph, Paraswap, Polymarket, RSS3, SAVM, Stargate, Uniswap, VSL}
var _WorkerNameToValueMap = map[string]Worker{
_WorkerName[0:4]: Aave,
@@ -105,36 +106,38 @@ var _WorkerNameToValueMap = map[string]Worker{
_WorkerLowerName[94:100]: LiNEAR,
_WorkerName[100:109]: Looksrare,
_WorkerLowerName[100:109]: Looksrare,
- _WorkerName[109:116]: Matters,
- _WorkerLowerName[109:116]: Matters,
- _WorkerName[116:122]: Mirror,
- _WorkerLowerName[116:122]: Mirror,
- _WorkerName[122:128]: Momoka,
- _WorkerLowerName[122:128]: Momoka,
- _WorkerName[128:133]: Nouns,
- _WorkerLowerName[128:133]: Nouns,
- _WorkerName[133:138]: Oneinch,
- _WorkerLowerName[133:138]: Oneinch,
- _WorkerName[138:145]: OpenSea,
- _WorkerLowerName[138:145]: OpenSea,
- _WorkerName[145:153]: Optimism,
- _WorkerLowerName[145:153]: Optimism,
- _WorkerName[153:162]: Paragraph,
- _WorkerLowerName[153:162]: Paragraph,
- _WorkerName[162:170]: Paraswap,
- _WorkerLowerName[162:170]: Paraswap,
- _WorkerName[170:180]: Polymarket,
- _WorkerLowerName[170:180]: Polymarket,
- _WorkerName[180:184]: RSS3,
- _WorkerLowerName[180:184]: RSS3,
- _WorkerName[184:188]: SAVM,
- _WorkerLowerName[184:188]: SAVM,
- _WorkerName[188:196]: Stargate,
- _WorkerLowerName[188:196]: Stargate,
- _WorkerName[196:203]: Uniswap,
- _WorkerLowerName[196:203]: Uniswap,
- _WorkerName[203:206]: VSL,
- _WorkerLowerName[203:206]: VSL,
+ _WorkerName[109:117]: Mastodon,
+ _WorkerLowerName[109:117]: Mastodon,
+ _WorkerName[117:124]: Matters,
+ _WorkerLowerName[117:124]: Matters,
+ _WorkerName[124:130]: Mirror,
+ _WorkerLowerName[124:130]: Mirror,
+ _WorkerName[130:136]: Momoka,
+ _WorkerLowerName[130:136]: Momoka,
+ _WorkerName[136:141]: Nouns,
+ _WorkerLowerName[136:141]: Nouns,
+ _WorkerName[141:146]: Oneinch,
+ _WorkerLowerName[141:146]: Oneinch,
+ _WorkerName[146:153]: OpenSea,
+ _WorkerLowerName[146:153]: OpenSea,
+ _WorkerName[153:161]: Optimism,
+ _WorkerLowerName[153:161]: Optimism,
+ _WorkerName[161:170]: Paragraph,
+ _WorkerLowerName[161:170]: Paragraph,
+ _WorkerName[170:178]: Paraswap,
+ _WorkerLowerName[170:178]: Paraswap,
+ _WorkerName[178:188]: Polymarket,
+ _WorkerLowerName[178:188]: Polymarket,
+ _WorkerName[188:192]: RSS3,
+ _WorkerLowerName[188:192]: RSS3,
+ _WorkerName[192:196]: SAVM,
+ _WorkerLowerName[192:196]: SAVM,
+ _WorkerName[196:204]: Stargate,
+ _WorkerLowerName[196:204]: Stargate,
+ _WorkerName[204:211]: Uniswap,
+ _WorkerLowerName[204:211]: Uniswap,
+ _WorkerName[211:214]: VSL,
+ _WorkerLowerName[211:214]: VSL,
}
var _WorkerNames = []string{
@@ -156,21 +159,22 @@ var _WorkerNames = []string{
_WorkerName[89:94],
_WorkerName[94:100],
_WorkerName[100:109],
- _WorkerName[109:116],
- _WorkerName[116:122],
- _WorkerName[122:128],
- _WorkerName[128:133],
- _WorkerName[133:138],
- _WorkerName[138:145],
- _WorkerName[145:153],
- _WorkerName[153:162],
- _WorkerName[162:170],
- _WorkerName[170:180],
- _WorkerName[180:184],
- _WorkerName[184:188],
- _WorkerName[188:196],
- _WorkerName[196:203],
- _WorkerName[203:206],
+ _WorkerName[109:117],
+ _WorkerName[117:124],
+ _WorkerName[124:130],
+ _WorkerName[130:136],
+ _WorkerName[136:141],
+ _WorkerName[141:146],
+ _WorkerName[146:153],
+ _WorkerName[153:161],
+ _WorkerName[161:170],
+ _WorkerName[170:178],
+ _WorkerName[178:188],
+ _WorkerName[188:192],
+ _WorkerName[192:196],
+ _WorkerName[196:204],
+ _WorkerName[204:211],
+ _WorkerName[211:214],
}
// WorkerString retrieves an enum value from the enum constants string name.
diff --git a/schema/worker/worker.go b/schema/worker/worker.go
index f0d2176b..97dc8192 100644
--- a/schema/worker/worker.go
+++ b/schema/worker/worker.go
@@ -19,15 +19,18 @@ func HookFunc() mapstructure.DecodeHookFuncType {
t reflect.Type,
data interface{},
) (interface{}, error) {
- if f.Kind() != reflect.String || t.Kind() != reflect.TypeOf((*Worker)(nil)).Elem().Kind() {
- return data, nil
- }
+ // Only process if the target type is Worker and the source type is string
+ if f.Kind() == reflect.String && t.Kind() == reflect.TypeOf((*Worker)(nil)).Elem().Kind() {
+ workerStr := data.(string)
- // TODO: Implement the logic to determine the worker type
- if value := rss.GetValueByWorkerStr(data.(string)); value != 0 {
- return value, nil
+ // TODO: Implement the logic to determine the worker type
+ if value := rss.GetValueByWorkerStr(workerStr); value != 0 {
+ return value, nil
+ } else if value := decentralized.GetValueByWorkerStr(workerStr); value != 0 {
+ return value, nil
+ }
}
-
- return decentralized.GetValueByWorkerStr(data.(string)), nil
+ // For all other types, return data
+ return data, nil
}
}