Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker/activitypub): implement mastodon worker to transform AP objects #431

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ func findModuleByID(configFile *config.File, workerID string) (*config.Module, e
return module, nil
}

if module, found := findInComponent(configFile.Component.RSS); found {
return module, nil
}
// if module, found := findInComponent(configFile.Component.RSS); found {
// return module, nil
// }

return nil, fmt.Errorf("undefined module %s", workerID)
}
Expand Down
169 changes: 125 additions & 44 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/rss3-network/node/schema/worker/federated"
"github.com/rss3-network/node/schema/worker/rss"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
Expand All @@ -35,6 +36,10 @@ endpoints:
url: https://rpc.ankr.com/eth
http_headers:
user-agent: rss3-node
mastodon:
url: https://0.0.0.0:9092/
http_headers:
user-agent: rss3-node
database:
driver: postgres
partition: true
Expand All @@ -60,15 +65,21 @@ observability:
endpoint: localhost:4318
component:
rss:
network: rss
worker: rsshub
endpoint: https://rsshub.app/
parameters:
authentication:
username: user
password: pass
access_key: abc
access_code: def
network: rss
worker: rsshub
endpoint: https://rsshub.app/
parameters:
authentication:
username: user
password: pass
access_key: abc
access_code: def
federated:
network: mastodon
worker: mastodon
endpoint: mastodon
parameters:
mastodon_kafka_topic: activitypub_events
decentralized:
- network: ethereum
worker: core
Expand All @@ -91,6 +102,12 @@ component:
"http_headers": {
"user-agent": "rss3-node"
}
},
"mastodon": {
"url": "https://0.0.0.0:9092/",
"http_headers": {
"user-agent": "rss3-node"
}
}
},
"discovery": {
Expand Down Expand Up @@ -136,41 +153,50 @@ component:
}
},
"component": {
"rss":
{
"network": "rss",
"worker": "rsshub",
"endpoint": "https://rsshub.app/",
"parameters": {
"authentication": {
"username": "user",
"password": "pass",
"access_key": "abc",
"access_code": "def"
}
}
},
"decentralized": [
{
"network": "ethereum",
"worker": "core",
"endpoint": "ethereum",
"parameters": {
"block_start": 47370106,
"block_target": 456
}
},
{
"network": "ethereum",
"worker": "rss3",
"endpoint": "https://rpc.ankr.com/eth",
"parameters": {
"block_start": 123,
"concurrent_block_requests": 2
}
"rss": {
"network": "rss",
"worker": "rsshub",
"endpoint": "https://rsshub.app/",
"parameters": {
"authentication": {
"username": "user",
"password": "pass",
"access_key": "abc",
"access_code": "def"
}
}
},
"federated": [
{
"network": "mastodon",
"worker": "mastodon",
"endpoint": "mastodon",
"parameters": {
"mastodon_kafka_topic": "activitypub_events"
}
]
}
}
],
"decentralized": [
{
"network": "ethereum",
"worker": "core",
"endpoint": "ethereum",
"parameters": {
"block_start": 47370106,
"block_target": 456
}
},
{
"network": "ethereum",
"worker": "rss3",
"endpoint": "https://rpc.ankr.com/eth",
"parameters": {
"block_start": 123,
"concurrent_block_requests": 2
}
}
]
}
}`
configExampleToml = `environment = "development"

Expand All @@ -184,6 +210,12 @@ url = "https://rpc.ankr.com/eth"
[endpoints.ethereum.http_headers]
user-agent = "rss3-node"

[endpoints.mastodon]
url = "https://0.0.0.0:9092/"

[endpoints.mastodon.http_headers]
user-agent = "rss3-node"

[discovery.server]
endpoint = "https://node.mydomain.com/"
global_indexer_endpoint = "https://gi.rss3.dev/"
Expand Down Expand Up @@ -227,6 +259,14 @@ password = "pass"
access_key = "abc"
access_code = "def"

[[component.federated]]
network = "mastodon"
worker = "mastodon"
endpoint = "mastodon"

[component.federated.parameters]
mastodon_kafka_topic = "activitypub_events"

[[component.decentralized]]
network = "ethereum"
worker = "core"
Expand Down Expand Up @@ -257,6 +297,12 @@ var configFileExpected = &File{
"user-agent": "rss3-node",
},
},
"mastodon": {
URL: "https://0.0.0.0:9092/",
HTTPHeaders: map[string]string{
"user-agent": "rss3-node",
},
},
},
Discovery: &Discovery{
Operator: &Operator{
Expand Down Expand Up @@ -286,7 +332,22 @@ var configFileExpected = &File{
},
},
},
Federated: nil,
Federated: []*Module{
{
Network: network.Mastodon,
Worker: federated.Mastodon,
EndpointID: "mastodon",
Endpoint: Endpoint{
URL: "https://0.0.0.0:9092/",
HTTPHeaders: map[string]string{
"user-agent": "rss3-node",
},
},
Parameters: &Parameters{
"mastodon_kafka_topic": "activitypub_events",
},
},
},
Decentralized: []*Module{
{
Network: network.Ethereum,
Expand Down Expand Up @@ -540,6 +601,26 @@ func AssertConfig(t *testing.T, expect, got *File) {
assert.Equal(t, expect.Discovery, got.Discovery)
})

t.Run("federated", func(t *testing.T) {
for i, federated := range expect.Component.Federated {
func(_expect, got *Module) {
t.Run(fmt.Sprintf("federated-%d", i), func(t *testing.T) {
t.Parallel()
assert.Equal(t, _expect, got)
})
}(federated, got.Component.Federated[i])
}

for i, indexer := range got.Component.Federated {
func(_except, got *Module) {
t.Run(fmt.Sprintf("%s-%s", indexer.Network, indexer.Worker), func(t *testing.T) {
t.Parallel()
AssertIndexer(t, _except, got)
})
}(configFileExpected.Component.Federated[i], indexer)
}
})

t.Run("decentralized", func(t *testing.T) {
func(_expect, got *Module) {
t.Run("rss", func(t *testing.T) {
Expand Down
20 changes: 16 additions & 4 deletions internal/engine/source/activitypub/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package activitypub

import (
"fmt"
"strconv"
"time"

"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/provider/activitypub"
Expand All @@ -13,6 +13,9 @@ import (

var _ engine.Task = (*Task)(nil)

// TODO: should be pulled from VSL (NetworkParams contract)
var defaultStartTime = "2024-07-22T00:00:00Z"

type Task struct {
Network network.Network
Message activitypub.Object
Expand All @@ -27,10 +30,19 @@ func (t Task) GetNetwork() network.Network {
}

func (t Task) GetTimestamp() uint64 {
publishedTimeStamp := t.Message.Published
timestamp, _ := strconv.ParseUint(publishedTimeStamp, 10, 64)
// Use default time if Published is empty
timeStr := t.Message.Published
if timeStr == "" {
timeStr = defaultStartTime
}

return timestamp
parsedTime, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
fmt.Println("Error parsing time:", err)
return 0
}
// Convert the time.Time object to a Unix timestamp and cast to uint64
return uint64(parsedTime.Unix())
}

func (t Task) Validate() error {
Expand Down
Loading
Loading