Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ip metadata v2 #6

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,42 +1,37 @@
ARG GO_VERSION=1.22.5
FROM golang:1.22.5-bookworm as builder

# Stage 1: Dependency management and build
FROM golang:${GO_VERSION}-bookworm as builder
# Install CA certificates and build essentials
RUN apt-get update && apt-get install -y ca-certificates build-essential && rm -rf /var/lib/apt/lists/*

# Set the working directory inside the container to /app
WORKDIR /app

# Copy go.mod and go.sum files
# Copy the Go module files
COPY go.mod go.sum ./

# Download dependencies and verify modules
RUN go mod download && go mod verify
# Download dependencies
RUN go mod download

# Copy the rest of the application source code
COPY . .

# Run go mod tidy to ensure the go.mod file is up to date
RUN go mod tidy
# Build the application; output the binary to a known location
RUN go build -v -o /run-app .

# Build the application and capture the output
RUN go build -v -o /run-app .

# Stage 2: Final stage
# Final stage based on Debian Bookworm-slim
FROM debian:bookworm-slim

# Install CA certificates in the final image
# Install CA certificates in the final image to ensure they are present.
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

# Copy the built executable from the builder stage
COPY --from=builder /run-app /usr/local/bin/run-app

# Create necessary directory
# Create a directory for the data
RUN mkdir -p /app/data

# Copy the CSV file to /app/data
COPY /data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the working directory
WORKDIR /app
# Copy the CSV file
COPY data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the command to run the application
CMD ["/usr/local/bin/run-app"]
96 changes: 50 additions & 46 deletions clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *ClickhouseClient) LoadIPMetadataFromCSV() error {
reader := csv.NewReader(file)
reader.FieldsPerRecord = -1 // Allow variable number of fields

batch, err := c.chConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata")
batch, err := c.ChConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata")
if err != nil {
return fmt.Errorf("failed to prepare batch: %w", err)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func parseFloat(s string) (float64, error) {
func (c *ClickhouseClient) isTableEmpty(tableName string) (bool, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s", tableName)
var count uint64
err := c.chConn.QueryRow(context.Background(), query).Scan(&count)
err := c.ChConn.QueryRow(context.Background(), query).Scan(&count)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func IPMetadataDDL(db string) string {
asn String,
asn_organization String,
asn_type String
) ENGINE = MergeTree()
) ENGINE = ReplacingMergeTree()
ORDER BY ip;
`, db)
}
Expand Down Expand Up @@ -267,74 +267,78 @@ type ClickhouseConfig struct {
}

type ClickhouseClient struct {
cfg *ClickhouseConfig
log zerolog.Logger

chConn driver.Conn

ValidatorEventChan chan *types.ValidatorEvent
IPMetadataEventChan chan *types.IPMetadataEvent
PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent
MetadataReceivedEventChan chan *types.MetadataReceivedEvent
cfg *ClickhouseConfig
log zerolog.Logger
ChConn clickhouse.Conn

ValidatorEventChan chan *types.ValidatorEvent
IPMetadataEventChan chan *types.IPMetadataEvent
PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent
MetadataReceivedEventChan chan *types.MetadataReceivedEvent
}


func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) {
log := log.NewLogger("clickhouse")

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{cfg.Endpoint},
DialTimeout: time.Second * 60,
Auth: clickhouse.Auth{
Database: cfg.DB,
Username: cfg.Username,
Password: cfg.Password,
},
Debugf: func(format string, v ...interface{}) {
log.Debug().Str("module", "clickhouse").Msgf(format, v)
},
Protocol: clickhouse.Native,
TLS: &tls.Config{},
})
log := log.NewLogger("clickhouse")

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{cfg.Endpoint},
DialTimeout: time.Second * 60,
Auth: clickhouse.Auth{
Database: cfg.DB,
Username: cfg.Username,
Password: cfg.Password,
},
Debugf: func(format string, v ...interface{}) {
log.Debug().Str("module", "clickhouse").Msgf(format, v)
},
Protocol: clickhouse.Native,
TLS: &tls.Config{},
})

if err != nil {
return nil, err
}
if err != nil {
return nil, err
}

return &ClickhouseClient{
cfg: cfg,
log: log,
ChConn: conn,

ValidatorEventChan: make(chan *types.ValidatorEvent, 16384),
IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384),
PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384),
MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384),
}, nil
}

return &ClickhouseClient{
cfg: cfg,
log: log,
chConn: conn,

ValidatorEventChan: make(chan *types.ValidatorEvent, 16384),
IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384),
PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384),
MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384),
}, nil
func (c *ClickhouseClient) Close() error {
return c.ChConn.Close()
}

func (c *ClickhouseClient) initializeTables() error {
// Create validator_metadata table
if err := c.chConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating validator_metadata table")
return err
}

// Create ip_metadata table
if err := c.chConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating ip_metadata table")
return err
}


// Create peer_discovered_events table
if err := c.chConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating peer_discovered_events table")
return err
}

// Create metadata_received_events table
if err := c.chConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil {
if err := c.ChConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating metadata_received_events table")
return err
}
Expand Down Expand Up @@ -368,7 +372,7 @@ func (c *ClickhouseClient) Start() error {
// BatchProcessor processes events in batches for a specified table in ClickHouse.
func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan <-chan T, maxSize uint64) {
// Prepare the initial batch.
batch, err := client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
batch, err := client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
if err != nil {
client.log.Error().Err(err).Msg("Failed to prepare batch")
return
Expand Down Expand Up @@ -396,7 +400,7 @@ func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan
}

// Prepare a new batch after sending the current batch.
batch, err = client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
batch, err = client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
if err != nil {
client.log.Error().Err(err).Msg("Failed to prepare new batch after sending")
return
Expand Down
Loading