Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add an HTTP CONNECT proxy for the blockchain node #636

Closed
wants to merge 11 commits into from
12 changes: 12 additions & 0 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/masa-finance/masa-oracle/pkg/api"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/db"
"github.com/masa-finance/masa-oracle/pkg/network"
"github.com/masa-finance/masa-oracle/pkg/staking"
)

Expand Down Expand Up @@ -111,6 +112,17 @@
logrus.Info("API server is disabled")
}

if cfg.ProxyEnabled {
proxy, err := network.NewProxy(masaNode.Host, cfg.ProxyListenAddr, cfg.ProxyListenPort, cfg.ProxyTargetPort)
if err != nil {
logrus.Fatalf("[-] Error creating HTTP CONNECT proxy: %v", err)
}

Check warning on line 119 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L115-L119

Added lines #L115 - L119 were not covered by tests

go func(ctx context.Context) {
proxy.Start(ctx)
}(ctx)

Check warning on line 123 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L121-L123

Added lines #L121 - L123 were not covered by tests
}

// Get the multiaddress and IP address of the node
multiAddrs, err := masaNode.GetP2PMultiAddrs()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
IsTelegramScraper bool
IsWebScraper bool

IsProxy bool

Bootnodes []string
RandomIdentity bool
Services []func(ctx context.Context, node *OracleNode)
Expand Down Expand Up @@ -87,6 +89,10 @@
o.IsWebScraper = true
}

var IsProxy = func(o *NodeOption) {
o.IsProxy = true

Check warning on line 93 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}

func (a *NodeOption) Apply(opts ...Option) {
for _, opt := range opts {
opt(a)
Expand Down
6 changes: 5 additions & 1 deletion node/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@
go p(node.Context, node)
}

go myNetwork.Discover(node.Context, node.Options.Bootnodes, node.Host, node.DHT, node.Protocol)
protocols := []protocol.ID{node.Protocol}
if node.Options.IsProxy {
protocols = append(protocols, myNetwork.ProxyProtocol)
}
go myNetwork.Discover(node.Context, node.Options.Bootnodes, node.Host, node.DHT, protocols)

Check warning on line 235 in node/oracle_node.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node.go#L231-L235

Added lines #L231 - L235 were not covered by tests

nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String())
if nodeData == nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
TelegramScraper bool `mapstructure:"telegramScraper"`
WebScraper bool `mapstructure:"webScraper"`
APIEnabled bool `mapstructure:"api_enabled"`
ProxyEnabled bool `mapstructure:"proxy_enabled"`
ProxyListenAddr string `mapstructure:"proxyListenAddr"`
ProxyListenPort uint16 `mapstructure:"proxyListenPort"`
ProxyTargetPort uint16 `mapstructure:"proxyTargetPort"`

KeyManager *masacrypto.KeyManager
TelegramStop bg.StopFunc
Expand Down Expand Up @@ -128,6 +132,12 @@
viper.SetDefault(PrivKeyFile, DefaultPrivKeyFile)

viper.SetDefault(APIEnabled, false)

viper.SetDefault(ProxyEnabled, true)
viper.SetDefault(ProxyListenAddr, "127.0.0.1")
viper.SetDefault(ProxyListenPort, "8888")
// TODO What is the default Cosmos port?
viper.SetDefault(ProxyTargetPort, "8080")

Check warning on line 140 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L135-L140

Added lines #L135 - L140 were not covered by tests
}

// setFileConfig loads configuration from a YAML file.
Expand Down Expand Up @@ -190,6 +200,10 @@
pflag.BoolVar(&c.WebScraper, "webScraper", viper.GetBool(WebScraper), "Web Scraper")
pflag.BoolVar(&c.Faucet, "faucet", viper.GetBool(Faucet), "Faucet")
pflag.BoolVar(&c.APIEnabled, "api-enabled", viper.GetBool(APIEnabled), "Enable API server")
pflag.BoolVar(&c.ProxyEnabled, "proxy-enabled", viper.GetBool(ProxyEnabled), "Enable CONNECT proxy")
pflag.StringVar(&c.ProxyListenAddr, "proxyListenAddr", viper.GetString(ProxyListenAddr), "Proxy listen address")
pflag.Uint16Var(&c.ProxyListenPort, "proxyListenPort", viper.GetUint16(ProxyListenPort), "Proxy listen port")
pflag.Uint16Var(&c.ProxyTargetPort, "proxyTargetPort", viper.GetUint16(ProxyTargetPort), "Proxy target port")

Check warning on line 206 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L203-L206

Added lines #L203 - L206 were not covered by tests

pflag.Parse()

Expand Down
25 changes: 15 additions & 10 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ const (
BlockTopic = "blockTopic"
Rendezvous = "masa-mdns"
PageSize = 25
DefaultPrivKeyFile = "masa_oracle_key"

TwitterUsername = "TWITTER_USERNAME"
TwitterPassword = "TWITTER_PASSWORD"
Twitter2FaCode = "TWITTER_2FA_CODE"
DiscordBotToken = "DISCORD_BOT_TOKEN"
TwitterScraper = "TWITTER_SCRAPER"
DiscordScraper = "DISCORD_SCRAPER"
TelegramScraper = "TELEGRAM_SCRAPER"
WebScraper = "WEB_SCRAPER"
APIEnabled = "API_ENABLED"
DefaultPrivKeyFile = "masa_oracle_key"
TwitterUsername = "TWITTER_USERNAME"
TwitterPassword = "TWITTER_PASSWORD"
Twitter2FaCode = "TWITTER_2FA_CODE"
DiscordBotToken = "DISCORD_BOT_TOKEN"
TwitterScraper = "TWITTER_SCRAPER"
DiscordScraper = "DISCORD_SCRAPER"
TelegramScraper = "TELEGRAM_SCRAPER"
WebScraper = "WEB_SCRAPER"
APIEnabled = "API_ENABLED"

ProxyEnabled = "PROXY_ENABLED"
ProxyListenAddr = "PROXY_LISTEN_ADDR"
ProxyListenPort = "PROXY_LISTEN_PORT"
ProxyTargetPort = "PROXY_TARGET_PORT"
)
4 changes: 4 additions & 0 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
masaNodeOptions = append(masaNodeOptions, node.IsWebScraper)
}

if cfg.ProxyEnabled {
masaNodeOptions = append(masaNodeOptions, node.IsProxy)
}

Check warning on line 69 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L68-L69

Added lines #L68 - L69 were not covered by tests

workHandlerManager := workers.NewWorkHandlerManager(workerManagerOptions...)
blockChainEventTracker := node.NewBlockChain()
pubKeySub := &pubsub.PublicKeySubscriptionHandler{}
Expand Down
154 changes: 83 additions & 71 deletions pkg/network/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,34 @@
// It initializes discovery via the DHT and advertises this node.
// It runs discovery in a loop with a ticker, re-advertising and finding new peers.
// For each discovered peer, it checks if already connected, and if not, dials them.
func Discover(ctx context.Context, bootNodes []string, host host.Host, dht *dht.IpfsDHT, protocol protocol.ID) {
func Discover(ctx context.Context, bootNodes []string, host host.Host, dht *dht.IpfsDHT, protocols []protocol.ID) {
var routingDiscovery *routing.RoutingDiscovery
protocolString := string(protocol)
logrus.Infof("[+] Discovering peers for protocol: %s", protocolString)

routingDiscovery = routing.NewRoutingDiscovery(dht)

// Advertise node right away, then it will re-advertise with each ticker interval
logrus.Infof("[+] Attempting to advertise protocol: %s", protocolString)
_, err := routingDiscovery.Advertise(ctx, protocolString)
if err != nil {
logrus.Debugf("[-] Failed to advertise protocol: %v", err)
} else {
logrus.Infof("[+] Successfully advertised protocol %s", protocolString)

protos := []string{}
for _, p := range protocols {
protos = append(protos, string(p))
}

for _, p := range protos {
logrus.Infof("[+] Discovering peers for protocol: %s", p)

routingDiscovery = routing.NewRoutingDiscovery(dht)

// Advertise node right away, then it will re-advertise with each ticker interval
logrus.Infof("[+] Attempting to advertise protocol: %s", p)
_, err := routingDiscovery.Advertise(ctx, p)
if err != nil {
logrus.Debugf("[-] Failed to advertise protocol: %v", err)
} else {
logrus.Infof("[+] Successfully advertised protocol %s", p)
}
}

ticker := time.NewTicker(time.Minute * 1)
defer ticker.Stop()

var peerChan <-chan peer.AddrInfo
var err error

for {
select {
Expand All @@ -56,66 +64,70 @@
logrus.Debug("[-] Searching for other peers...")
routingDiscovery = routing.NewRoutingDiscovery(dht)

// Advertise this node
logrus.Debugf("[-] Attempting to advertise protocol: %s", protocolString)
_, err := routingDiscovery.Advertise(ctx, protocolString)
if err != nil {
logrus.Debugf("[-] Failed to advertise protocol with error %v", err)

// Network retry when connectivity is temporarily lost using NewExponentialBackOff
expBackOff := backoff.NewExponentialBackOff()
expBackOff.MaxElapsedTime = time.Second * 10
err := backoff.Retry(func() error {
peerChan, err = routingDiscovery.FindPeers(ctx, protocolString)
return err
}, expBackOff)
if err != nil {
logrus.Warningf("[-] Retry failed to find peers: %v", err)
}

} else {
logrus.Infof("[+] Successfully advertised protocol: %s", protocolString)
}
for _, protocolString := range protos {
// Advertise this node
logrus.Debugf("[-] Attempting to advertise protocol: %s", protocolString)
if _, err := routingDiscovery.Advertise(ctx, protocolString); err != nil {
logrus.Debugf("[-] Failed to advertise protocol with error %v", err)

// Network retry when connectivity is temporarily lost using NewExponentialBackOff
expBackOff := backoff.NewExponentialBackOff()
expBackOff.MaxElapsedTime = time.Second * 10
err = backoff.Retry(func() error {
peerChan, err = routingDiscovery.FindPeers(ctx, protocolString)
return err
}, expBackOff)
if err != nil {
logrus.Warningf("[-] Retry failed to find peers: %v", err)
}

Check warning on line 82 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L67-L82

Added lines #L67 - L82 were not covered by tests

// Use the routing discovery to find peers.
peerChan, err = routingDiscovery.FindPeers(ctx, protocolString)
if err != nil {
logrus.Errorf("[-] Failed to find peers: %v", err)
} else {
logrus.Debug("[+] Successfully started finding peers")
}
select {
case availPeer, ok := <-peerChan:
if !ok {
logrus.Info("[+] Peer channel closed, restarting discovery")
break
}
// validating proper peers to connect to
availPeerAddrInfo := peer.AddrInfo{
ID: availPeer.ID,
Addrs: availPeer.Addrs,
} else {
logrus.Infof("[+] Successfully advertised protocol: %s", protocolString)

Check warning on line 85 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}
if availPeerAddrInfo.ID == host.ID() {
logrus.Debugf("Skipping connect to self: %s", availPeerAddrInfo.ID.String())
continue

// Use the routing discovery to find peers.
peerChan, err = routingDiscovery.FindPeers(ctx, protocolString)
if err != nil {
logrus.Errorf("[-] Failed to find peers: %v", err)
} else {
logrus.Debug("[+] Successfully started finding peers")

Check warning on line 93 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L89-L93

Added lines #L89 - L93 were not covered by tests
}
if len(availPeerAddrInfo.Addrs) == 0 {
for _, bn := range bootNodes {
bootNode := strings.Split(bn, "/")[len(strings.Split(bn, "/"))-1]
if availPeerAddrInfo.ID.String() != bootNode {
logrus.Warningf("Skipping connect to non bootnode peer with no multiaddress: %s", availPeerAddrInfo.ID.String())
continue

select {
case availPeer, ok := <-peerChan:
if !ok {
logrus.Info("[+] Peer channel closed, restarting discovery")
break

Check warning on line 100 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L96-L100

Added lines #L96 - L100 were not covered by tests
}
// validating proper peers to connect to
availPeerAddrInfo := peer.AddrInfo{
ID: availPeer.ID,
Addrs: availPeer.Addrs,
}
if availPeerAddrInfo.ID == host.ID() {
logrus.Debugf("Skipping connect to self: %s", availPeerAddrInfo.ID.String())
continue

Check warning on line 109 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L103-L109

Added lines #L103 - L109 were not covered by tests
}
if len(availPeerAddrInfo.Addrs) == 0 {
for _, bn := range bootNodes {
bootNode := strings.Split(bn, "/")[len(strings.Split(bn, "/"))-1]
if availPeerAddrInfo.ID.String() != bootNode {
logrus.Warningf("Skipping connect to non bootnode peer with no multiaddress: %s", availPeerAddrInfo.ID.String())
continue

Check warning on line 116 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L111-L116

Added lines #L111 - L116 were not covered by tests
}
}
}
}
logrus.Infof("[+] Available Peer: %s", availPeer.String())

if host.Network().Connectedness(availPeer.ID) != network.Connected {
if isConnectedToBootnode(host, bootNodes) {
_, err := host.Network().DialPeer(ctx, availPeer.ID)
if err != nil {
logrus.Warningf("[-] Failed to connect to peer %s, will retry...", availPeer.ID.String())
continue
logrus.Infof("[+] Available Peer: %s", availPeer.String())

if host.Network().Connectedness(availPeer.ID) != network.Connected {
if isConnectedToBootnode(host, bootNodes) {
_, err := host.Network().DialPeer(ctx, availPeer.ID)
if err != nil {
logrus.Warningf("[-] Failed to connect to peer %s, will retry...", availPeer.ID.String())
continue
} else {
logrus.Infof("[+] Connected to peer %s", availPeer.ID.String())
}

Check warning on line 130 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L120-L130

Added lines #L120 - L130 were not covered by tests
} else {
for _, bn := range bootNodes {
if len(bn) > 0 {
Expand All @@ -125,10 +137,10 @@
}
}
}
case <-ctx.Done():
logrus.Info("[-] Stopping peer discovery")
return

Check warning on line 142 in pkg/network/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/discover.go#L140-L142

Added lines #L140 - L142 were not covered by tests
}
case <-ctx.Done():
logrus.Info("[-] Stopping peer discovery")
return
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type discoveryNotifee struct {
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
pe := PeerEvent{
AddrInfo: pi,
Action: "PeerFound",
Action: PeerFound,
Source: "mdns",
Rendezvous: n.Rendezvous,
}
Expand Down
Loading
Loading