Skip to content

Commit

Permalink
feat(worker/activitypub):implement additional support for complete Ac…
Browse files Browse the repository at this point in the history
…tivityPub Integration with full engine capability (#542)

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: pseudoyu <[email protected]>
Co-authored-by: polebug <[email protected]>
  • Loading branch information
4 people authored Sep 24, 2024
1 parent 32913ab commit 4aea9b7
Show file tree
Hide file tree
Showing 42 changed files with 2,979 additions and 188 deletions.
40 changes: 34 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,49 @@ func runCoreService(ctx context.Context, config *config.File, databaseClient dat
return nil
}

// findModuleByID find and returns the specified worker ID in all components.
func findModuleByID(configFile *config.File, workerID string) (*config.Module, error) {
// find the module in a specific component list
findInComponent := func(components []*config.Module) (*config.Module, bool) {
for _, module := range components {
if strings.EqualFold(module.ID, workerID) {
return module, true
}
}

return nil, false
}

// Search in decentralized components
if module, found := findInComponent(configFile.Component.Decentralized); found {
return module, nil
}

// Search in federated components
if module, found := findInComponent(configFile.Component.Federated); found {
return module, nil
}

if module, found := findInComponent([]*config.Module{configFile.Component.RSS}); found {
return module, nil
}

return nil, fmt.Errorf("undefined module %s", workerID)
}

func runWorker(ctx context.Context, configFile *config.File, databaseClient database.Client, streamClient stream.Client, redisClient rueidis.Client) error {
workerID, err := flags.GetString(flag.KeyWorkerID)
if err != nil {
return fmt.Errorf("invalid worker id: %w", err)
}

module, found := lo.Find(configFile.Component.Decentralized, func(module *config.Module) bool {
return strings.EqualFold(module.ID, workerID)
})

if !found {
return fmt.Errorf("undefined module %s", workerID)
module, err := findModuleByID(configFile, workerID)
if err != nil {
return fmt.Errorf("find module by id: %w", err)
}

server, err := indexer.NewServer(ctx, module, databaseClient, streamClient, redisClient)

if err != nil {
return fmt.Errorf("new indexer server: %w", err)
}
Expand Down
168 changes: 124 additions & 44 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ endpoints:
url: https://rpc.ankr.com/eth
http_headers:
user-agent: rss3-node
mastodon:
url: https://0.0.0.0:9092/
http_headers:
user-agent: rss3-node
database:
driver: postgres
partition: true
Expand All @@ -60,15 +64,21 @@ observability:
endpoint: localhost:4318
component:
rss:
network: rss
worker: rsshub
endpoint: https://rsshub.app/
parameters:
authentication:
username: user
password: pass
access_key: abc
access_code: def
network: rss
worker: rsshub
endpoint: https://rsshub.app/
parameters:
authentication:
username: user
password: pass
access_key: abc
access_code: def
federated:
network: mastodon
worker: mastodon
endpoint: mastodon
parameters:
mastodon_kafka_topic: activitypub_events
decentralized:
- network: ethereum
worker: core
Expand All @@ -91,6 +101,12 @@ component:
"http_headers": {
"user-agent": "rss3-node"
}
},
"mastodon": {
"url": "https://0.0.0.0:9092/",
"http_headers": {
"user-agent": "rss3-node"
}
}
},
"discovery": {
Expand Down Expand Up @@ -136,41 +152,50 @@ component:
}
},
"component": {
"rss":
{
"network": "rss",
"worker": "rsshub",
"endpoint": "https://rsshub.app/",
"parameters": {
"authentication": {
"username": "user",
"password": "pass",
"access_key": "abc",
"access_code": "def"
}
}
},
"decentralized": [
{
"network": "ethereum",
"worker": "core",
"endpoint": "ethereum",
"parameters": {
"block_start": 47370106,
"block_target": 456
}
},
{
"network": "ethereum",
"worker": "rss3",
"endpoint": "https://rpc.ankr.com/eth",
"parameters": {
"block_start": 123,
"concurrent_block_requests": 2
}
"rss": {
"network": "rss",
"worker": "rsshub",
"endpoint": "https://rsshub.app/",
"parameters": {
"authentication": {
"username": "user",
"password": "pass",
"access_key": "abc",
"access_code": "def"
}
}
},
"federated": [
{
"network": "mastodon",
"worker": "mastodon",
"endpoint": "mastodon",
"parameters": {
"mastodon_kafka_topic": "activitypub_events"
}
]
}
}
],
"decentralized": [
{
"network": "ethereum",
"worker": "core",
"endpoint": "ethereum",
"parameters": {
"block_start": 47370106,
"block_target": 456
}
},
{
"network": "ethereum",
"worker": "rss3",
"endpoint": "https://rpc.ankr.com/eth",
"parameters": {
"block_start": 123,
"concurrent_block_requests": 2
}
}
]
}
}`
configExampleToml = `environment = "development"
Expand All @@ -184,6 +209,12 @@ url = "https://rpc.ankr.com/eth"
[endpoints.ethereum.http_headers]
user-agent = "rss3-node"
[endpoints.mastodon]
url = "https://0.0.0.0:9092/"
[endpoints.mastodon.http_headers]
user-agent = "rss3-node"
[discovery.server]
endpoint = "https://node.mydomain.com/"
global_indexer_endpoint = "https://gi.rss3.dev/"
Expand Down Expand Up @@ -227,6 +258,14 @@ password = "pass"
access_key = "abc"
access_code = "def"
[[component.federated]]
network = "mastodon"
worker = "mastodon"
endpoint = "mastodon"
[component.federated.parameters]
mastodon_kafka_topic = "activitypub_events"
[[component.decentralized]]
network = "ethereum"
worker = "core"
Expand Down Expand Up @@ -257,6 +296,12 @@ var configFileExpected = &File{
"user-agent": "rss3-node",
},
},
"mastodon": {
URL: "https://0.0.0.0:9092/",
HTTPHeaders: map[string]string{
"user-agent": "rss3-node",
},
},
},
Discovery: &Discovery{
Operator: &Operator{
Expand Down Expand Up @@ -286,7 +331,22 @@ var configFileExpected = &File{
},
},
},
Federated: nil,
Federated: []*Module{
{
Network: network.Mastodon,
Worker: decentralized.Mastodon,
EndpointID: "mastodon",
Endpoint: Endpoint{
URL: "https://0.0.0.0:9092/",
HTTPHeaders: map[string]string{
"user-agent": "rss3-node",
},
},
Parameters: &Parameters{
"mastodon_kafka_topic": "activitypub_events",
},
},
},
Decentralized: []*Module{
{
Network: network.Ethereum,
Expand Down Expand Up @@ -540,6 +600,26 @@ func AssertConfig(t *testing.T, expect, got *File) {
assert.Equal(t, expect.Discovery, got.Discovery)
})

t.Run("federated", func(t *testing.T) {
for i, federated := range expect.Component.Federated {
func(_expect, got *Module) {
t.Run(fmt.Sprintf("federated-%d", i), func(t *testing.T) {
t.Parallel()
assert.Equal(t, _expect, got)
})
}(federated, got.Component.Federated[i])
}

for i, indexer := range got.Component.Federated {
func(_except, got *Module) {
t.Run(fmt.Sprintf("%s-%s", indexer.Network, indexer.Worker), func(t *testing.T) {
t.Parallel()
AssertIndexer(t, _except, got)
})
}(configFileExpected.Component.Federated[i], indexer)
}
})

t.Run("decentralized", func(t *testing.T) {
func(_expect, got *Module) {
t.Run("rss", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@
"description": "The platform on which activities occur.",
"type": "string",
"example": "Uniswap",
"enum": ["1inch","AAVE","Aavegotchi","Arbitrum","Base","BendDAO","Cow","Crossbell","Curve","ENS","Farcaster","Highlight","IQWiki","KiwiStand","Lens","LiNEAR","Lido","Linear","LooksRare","Matters","Mirror","Nouns","OpenSea","Optimism","Paragraph","Paraswap","RSS3","SAVM","Stargate","Uniswap","Unknown","VSL"]
"enum": ["1inch","AAVE","Aavegotchi","Arbitrum","Base","BendDAO","Cow","Crossbell","Curve","ENS","Farcaster","Highlight","IQWiki","KiwiStand","Lens","LiNEAR","Lido","Linear","LooksRare","Mastodon","Matters","Mirror","Nouns","OpenSea","Optimism","Paragraph","Paraswap","Polymarket","RSS3","SAVM","Stargate","Uniswap","Unknown","VSL"]
},
"Timestamp": {
"description": "The timestamp of when the activity occurred.",
Expand Down
6 changes: 6 additions & 0 deletions internal/database/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Client interface {
DatasetMirrorPost
DatasetFarcasterProfile
DatasetENSNamehash
DatasetMastodonHandle

LoadCheckpoint(ctx context.Context, id string, network network.Network, worker string) (*engine.Checkpoint, error)
LoadCheckpoints(ctx context.Context, id string, network network.Network, worker string) ([]*engine.Checkpoint, error)
Expand Down Expand Up @@ -58,6 +59,11 @@ type DatasetENSNamehash interface {
SaveDatasetENSNamehash(ctx context.Context, namehash *model.ENSNamehash) error
}

type DatasetMastodonHandle interface {
LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error)
SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error
}

var _ goose.Logger = (*SugaredLogger)(nil)

type SugaredLogger struct {
Expand Down
43 changes: 42 additions & 1 deletion internal/database/dialer/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (c *client) SaveDatasetMirrorPost(ctx context.Context, post *mirror_model.D
return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
}

// FindActivity finds a Activity by id.
// FindActivity finds an Activity by id.
func (c *client) FindActivity(ctx context.Context, query model.ActivityQuery) (*activityx.Activity, *int, error) {
if c.partition {
return c.findActivityPartitioned(ctx, query)
Expand Down Expand Up @@ -372,6 +372,47 @@ func (c *client) SaveDatasetENSNamehash(ctx context.Context, namehash *model.ENS
return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
}

// LoadDatasetMastodonHandle loads a Mastodon handle.
func (c *client) LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error) {
var value table.DatasetMastodonHandle

if err := c.database.WithContext(ctx).
Where("handle = ?", handle).
First(&value).
Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}

// Initialize a default handle.
value = table.DatasetMastodonHandle{
Handle: handle,
LastUpdated: time.Now(),
}
}

return value.Export()
}

// SaveDatasetMastodonHandle saves a Mastodon handle.
func (c *client) SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error {
clauses := []clause.Expression{
clause.OnConflict{
Columns: []clause.Column{{Name: "handle"}},
DoUpdates: clause.Assignments(map[string]interface{}{
"last_updated": time.Now(),
}),
},
}

var value table.DatasetMastodonHandle
if err := value.Import(handle); err != nil {
return err
}

return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
}

// Dial dials a database.
func Dial(ctx context.Context, dataSourceName string, partition bool) (database.Client, error) {
var err error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS dataset_mastodon_handles (
handle VARCHAR(255) PRIMARY KEY,
last_updated TIMESTAMP NOT NULL
);

CREATE INDEX idx_mastodon_handles_last_updated ON dataset_mastodon_handles(last_updated);

-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS dataset_mastodon_handles;
-- +goose StatementEnd
Loading

0 comments on commit 4aea9b7

Please sign in to comment.