Skip to content

Commit

Permalink
feat(torch): fix goroutines issue
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Ramon Mañes <[email protected]>
  • Loading branch information
tty47 committed Oct 28, 2023
1 parent d87a637 commit d4b7a25
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 50 deletions.
14 changes: 8 additions & 6 deletions pkg/nodes/nodes_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

const (
prefetchLimit = 10 // prefetchLimit
pollDuration = 10 * time.Second // pollDuration how often is Torch going to pull data from the queue.
consumerName = "torch-consumer" // consumerName name used in the tag to identify the consumer.
consumerName = "torch-consumer" // consumerName name used in the tag to identify the consumer.
prefetchLimit = 10 // prefetchLimit
pollDuration = 10 * time.Second // pollDuration how often is Torch going to pull data from the queue.
timeoutDurationConsumer = 60 * time.Second // timeoutDurationConsumer timeout for the consumer.
)

// ConsumerInit initialize the process to check the queues in Redis.
Expand All @@ -27,7 +28,7 @@ func ConsumerInit(queueName string) {

red := redis.InitRedisConfig()
// Create a new context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
ctx, cancel := context.WithTimeout(context.Background(), timeoutDurationConsumer)

// Make sure to call the cancel function to release resources when you're done
defer cancel()
Expand Down Expand Up @@ -55,8 +56,9 @@ func ConsumerInit(queueName string) {
_, err = queue.AddConsumerFunc(consumerName, func(delivery rmq.Delivery) {
log.Info("Performing task: ", delivery.Payload())
peer := config.Peer{
NodeName: delivery.Payload(),
NodeType: "da",
NodeName: delivery.Payload(),
NodeType: "da",
ContainerName: "da",
}

// here we wil send the node to generate the id
Expand Down
73 changes: 29 additions & 44 deletions pkg/nodes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,91 +11,76 @@ import (
"github.com/celestiaorg/torch/pkg/metrics"
)

const (
MaxRetryCount = 5 // MaxRetryCount number of retries per node.
TickerTime = 5 * time.Second // TickerTime time specified to make a signal.
)

var (
taskQueue = make(chan config.Peer) // taskQueue channel for pending tasks (peers to process later).
taskQueue = make(chan config.Peer) // taskQueue channel for pending tasks (peers to process later).
MaxRetryCount = 5 // MaxRetryCount number of retries per node.
TickerTime = 5 * time.Second // TickerTime time specified to make a signal.
timeoutDurationProcessQueue = 10 * time.Second // timeoutDurationProcessQueue time specified to make a signal.
)

// ProcessTaskQueue processes the pending tasks in the queue the time specified in the const TickerTime.
func ProcessTaskQueue(ctx context.Context) {
func ProcessTaskQueue() {
ticker := time.NewTicker(TickerTime)

for {
select {
case <-ctx.Done():
// The context has been canceled, exit the loop.
return
case <-ticker.C:
processQueue(ctx)
processQueue()
}
}
}

// processQueue process the nodes in the queue and tries to generate the Multi Address
func processQueue(ctx context.Context) {
func processQueue() {
red := redis.InitRedisConfig()
// Create a new context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), timeoutDurationProcessQueue)

// Make sure to call the cancel function to release resources when you're done
defer cancel()

for {
select {
case <-ctx.Done():
// The context has been canceled, exit the loop.
log.Error("processQueue - The context has been canceled, exit the loop.")
return
case peer := <-taskQueue:
// Perform the operation with the node
// TODO:
// errors should be returned back and go routines needs to be in errGroup instead of pure go
err := CheckNodesInDBOrCreateThem(peer, red, ctx)
if err != nil {
log.Error("Error checking the nodes: CheckNodesInDBOrCreateThem - ", err)
}

default:
return
}
}
}

// CheckNodesInDBOrCreateThem attempts to find the node in the DB; if the node is not in the DB, it attempts to create it.
// CheckNodesInDBOrCreateThem try to find the node in the DB, if the node is not in the DB, it tries to create it.
func CheckNodesInDBOrCreateThem(peer config.Peer, red *redis.RedisClient, ctx context.Context) error {
log.Info("Processing Node in the queue: ", "[", peer.NodeName, "]")
// Check if the node is in the DB
// check if the node is in the DB
ma, err := redis.CheckIfNodeExistsInDB(red, ctx, peer.NodeName)
if err != nil {
log.Error("Error CheckIfNodeExistsInDB for node: [", peer.NodeName, "]", err)
log.Error("Error CheckIfNodeExistsInDB for node: [", peer.NodeName, "]: ", err)
return err
}
// If the node doesn't exist in the DB, attempt to generate it in a goroutine

// if the node doesn't exist in the DB, let's try to create it
if ma == "" {
log.Info("Node ", "["+peer.NodeName+"]"+" NOT found in DB, let's try to generate it")

// Create a channel for errors
errCh := make(chan error)

// Start a goroutine for GenerateNodeIdAndSaveIt
go func() {
defer close(errCh)
var generateErr error
ma, generateErr = GenerateNodeIdAndSaveIt(peer, peer.NodeName, red, ctx)
if generateErr != nil {
errCh <- generateErr
}
}()

// Wait for the goroutine to finish and check for errors
select {
case generateErr := <-errCh:
if generateErr != nil {
log.Error("Error GenerateNodeIdAndSaveIt for full-node: [", peer.NodeName, "]", generateErr)
return generateErr
}
case <-ctx.Done():
// Context canceled, return an error or handle it as needed
return ctx.Err()
ma, err = GenerateNodeIdAndSaveIt(peer, peer.NodeName, red, ctx)
if err != nil {
log.Error("Error GenerateNodeIdAndSaveIt for full-node: [", peer.NodeName, "]", err)
}
return err
}

// Check if the multi-address is empty after attempting to generate it
// check if the multi address is empty after trying to generate it
if ma == "" {
// Check if the node is still within the maximum number of retries
// check if the node is still under the maximum number of retries
if peer.RetryCount < MaxRetryCount {
log.Info("Node ", "["+peer.NodeName+"]"+" NOT found in DB, adding it to the queue, attempt: ", "[", peer.RetryCount, "]")
peer.RetryCount++ // increment the counter
Expand Down

0 comments on commit d4b7a25

Please sign in to comment.