Skip to content

Commit

Permalink
prep clickhouse integration
Browse files Browse the repository at this point in the history
  • Loading branch information
kasteph committed Nov 28, 2024
1 parent 081586c commit 815ca31
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 91 deletions.
175 changes: 112 additions & 63 deletions cmd/honeypot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/probe-lab/ants-watch"
"github.com/probe-lab/ants-watch/metrics"
"github.com/probe-lab/ants-watch/db"
"github.com/urfave/cli/v2"
)

var logger = logging.Logger("ants-queen")

func runQueen(ctx context.Context, nebulaPostgresStr string, nPorts, firstPort int, upnp bool) error {
func runQueen(ctx context.Context, nebulaPostgresStr string, nPorts, firstPort int, upnp bool, clickhouseClient *db.Client) error {
var queen *ants.Queen
var err error

Expand All @@ -27,9 +26,9 @@ func runQueen(ctx context.Context, nebulaPostgresStr string, nPorts, firstPort i
}

if upnp {
queen, err = ants.NewQueen(ctx, nebulaPostgresStr, keyDBPath, 0, 0)
queen, err = ants.NewQueen(ctx, nebulaPostgresStr, keyDBPath, 0, 0, clickhouseClient)
} else {
queen, err = ants.NewQueen(ctx, nebulaPostgresStr, keyDBPath, uint16(nPorts), uint16(firstPort))
queen, err = ants.NewQueen(ctx, nebulaPostgresStr, keyDBPath, uint16(nPorts), uint16(firstPort), clickhouseClient)
}
if err != nil {
return fmt.Errorf("failed to create queen: %w", err)
Expand Down Expand Up @@ -65,77 +64,127 @@ func main() {
logging.SetLogLevel("dht", "error")
logging.SetLogLevel("basichost", "info")

queenCmd := flag.NewFlagSet("queen", flag.ExitOnError)
nebulaPostgresStr := *queenCmd.String("postgres", "", "Postgres connection string, postgres://user:password@host:port/dbname")
if len(nebulaPostgresStr) == 0 {
nebulaPostgresStr = os.Getenv("NEBULA_POSTGRES_CONNURL")
app := &cli.App{
Name: "ants-watch",
Usage: "Get DHT clients in your p2p network using a honeypot",
Commands: []*cli.Command{
{
Name: "queen",
Usage: "Starts the queen service",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "clickhouseAddress",
Usage: "ClickHouse address containing the host and port, 127.0.0.1:9000",
EnvVars: []string{"CLICKHOUSE_ADDRESS"},
},
&cli.StringFlag{
Name: "clickhouseDatabase",
Usage: "The ClickHouse database where ants requests will be recorded",
EnvVars: []string{"CLICKHOUSE_DATABASE"},
},
&cli.StringFlag{
Name: "clickhouseUsername",
Usage: "The ClickHouse user that has the prerequisite privileges to record the requests",
EnvVars: []string{"CLICKHOUSE_USERNAME"},
},
&cli.StringFlag{
Name: "clickhousePassword",
Usage: "The password for the ClickHouse user",
EnvVars: []string{"CLICKHOUSE_PASSWORD"},
},
&cli.StringFlag{
Name: "nebulaDatabaseConnString",
Usage: "The connection string for the Postgres Nebula database",
EnvVars: []string{"NEBULA_DB_CONNSTRING"},
},
&cli.IntFlag{
Name: "nPorts",
Value: 128,
Usage: "Number of ports ants can listen on",
},
&cli.IntFlag{
Name: "firstPort",
Value: 6000,
Usage: "First port ants can listen on",
},
&cli.BoolFlag{
Name: "upnp",
Value: false,
Usage: "Enable UPnP",
},
},
Action: func(c *cli.Context) error {
return runQueenCommand(c)
},
},
{
Name: "health",
Usage: "Checks the health of the service",
Action: func(c *cli.Context) error {
return healthCheckCommand()
},
},
},
}

nPorts := queenCmd.Int("nPorts", 128, "Number of ports ants can listen on")
firstPort := queenCmd.Int("firstPort", 6000, "First port ants can listen on")
upnp := queenCmd.Bool("upnp", false, "Enable UPnP")


healthCmd := flag.NewFlagSet("health", flag.ExitOnError)

if len(os.Args) < 2 {
fmt.Println("Expected 'queen' or 'health' subcommands")
if err := app.Run(os.Args); err != nil {
logger.Warnf("Error running app: %v\n", err)
os.Exit(1)
}

if os.Args[1] != "health" {
metricsHost := os.Getenv("METRICS_HOST")
metricsPort := os.Getenv("METRICS_PORT")
logger.Debugln("Work is done")
}

p, err := strconv.Atoi(metricsPort)
if err != nil {
logger.Errorf("Port should be an int %v\n", metricsPort)
}
logger.Infoln("Serving metrics endpoint")
go metrics.ListenAndServe(metricsHost, p)
}
func runQueenCommand(c *cli.Context) error {
nebulaPostgresStr := c.String("nebulaDatabaseConnString")
nPorts := c.Int("nPorts")
firstPort := c.Int("firstPort")
upnp := c.Bool("upnp")

switch os.Args[1] {
case "queen":
queenCmd.Parse(os.Args[2:])
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
address := c.String("clickhouseAddress")
database := c.String("clickhouseDatabase")
username := c.String("clickhouseUsername")
password := c.String("clickhousePassword")

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
client, err := db.NewDatabaseClient(
ctx, address, database, username, password,
)
if err != nil {
logger.Errorln(err)
}

errChan := make(chan error, 1)
go func() {
errChan <- runQueen(ctx, nebulaPostgresStr, *nPorts, *firstPort, *upnp)
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

select {
case err := <-errChan:
if err != nil {
logger.Error(err)
os.Exit(1)
}
case sig := <-sigChan:
logger.Infof("Received signal: %v, initiating shutdown...", sig)
cancel()
<-errChan
}
errChan := make(chan error, 1)

case "health":
healthCmd.Parse(os.Args[2:])
go func() {
errChan <- runQueen(ctx, nebulaPostgresStr, nPorts, firstPort, upnp, client)
}()

ctx := context.Background()
if err := HealthCheck(&ctx); err != nil {
fmt.Printf("Health check failed: %v\n", err)
os.Exit(1)
select {
case err := <-errChan:
if err != nil {
logger.Error(err)
return err
}
fmt.Println("Health check passed")

default:
fmt.Printf("Unknown command: %s\n", os.Args[1])
os.Exit(1)
case sig := <-sigChan:
logger.Infof("Received signal: %v, initiating shutdown...", sig)
cancel()
<-errChan
}
return nil
}

logger.Debugln("Work is done")
func healthCheckCommand() error {
ctx := context.Background()
if err := HealthCheck(&ctx); err != nil {
logger.Infof("Health check failed: %v\n", err)
return err
}
logger.Infoln("Health check passed")
return nil
}
46 changes: 46 additions & 0 deletions db/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package db

import (
"context"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
// "github.com/dennis-tra/nebula-crawler/config"
lru "github.com/hashicorp/golang-lru"
mt "github.com/probe-lab/ants-watch/metrics"
// log "github.com/ipfs/go-log/v2"
)

type Client struct {
ctx context.Context
conn driver.Conn

agentVersion *lru.Cache
protocols *lru.Cache
protocolsSets *lru.Cache

telemetry *mt.Telemetry
}

func NewDatabaseClient(ctx context.Context, address, database, username, password string) (*Client, error) {
logger.Infoln("Creating new database client...")

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{address},
Auth: clickhouse.Auth{
Database: database,
Username: username,
Password: password,
},
Debug: true,
})

if err != nil {
return nil, err
}

return &Client{
ctx: ctx,
conn: conn,
}, nil
}
27 changes: 18 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,39 @@ require (
)

require (
github.com/dennis-tra/nebula-crawler v0.0.0-20241010113859-38e4489a8fa7
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
github.com/dennis-tra/nebula-crawler v0.0.0-20241105123054-bbd84dcd5b43
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/urfave/cli/v2 v2.27.5
)

require (
github.com/ClickHouse/ch-go v0.63.1 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/ericlagergren/decimal v0.0.0-20240411145413-00de7ca16731 // indirect
github.com/ethereum/go-ethereum v1.14.11 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/holiman/uint256 v1.3.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/oschwald/maxminddb-golang v1.13.1 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/urfave/cli/v2 v2.27.4 // indirect
github.com/volatiletech/inflect v0.0.1 // indirect
github.com/volatiletech/randomize v0.0.1 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
Expand Down Expand Up @@ -102,7 +111,7 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down Expand Up @@ -168,20 +177,20 @@ require (
github.com/stretchr/testify v1.9.0
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/metric v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
go.opentelemetry.io/otel v1.32.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/trace v1.32.0
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.22.2 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.26.0 // indirect
gonum.org/v1/gonum v0.15.1 // indirect
Expand Down
Loading

0 comments on commit 815ca31

Please sign in to comment.