Skip to content

Commit

Permalink
🔄 synced local 'pkg/' with remote 'pkg/'
Browse files Browse the repository at this point in the history
  • Loading branch information
constient-altrova committed Jun 7, 2024
1 parent dd5a43b commit 585cb8e
Showing 1 changed file with 58 additions and 8 deletions.
66 changes: 58 additions & 8 deletions pkg/cmdutil/grpcutil/onboarding_wait_for_log.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package grpcutil

import (
"bytes"
"context"
"fmt"
"log"
"os/exec"
"time"

"github.com/logfire-sh/cli/internal/config"
"github.com/logfire-sh/cli/pkg/cmd/sources/models"
"github.com/logfire-sh/cli/pkg/cmdutil/APICalls"
pb "github.com/logfire-sh/cli/services/flink-service"
)

func GetLog(token string, endpoint string, teamId string, accountId string, sourceId string, stop chan error) {
type Options struct {
Ctx context.Context
}

func GetLog(config config.Config, token, endpoint, teamId, accountId, sourceId, sourceToken string, stop chan error) {
request := &pb.FilterRequest{
DateTimeFilter: &pb.DateTimeFilter{},
Sources: []*pb.Source{},
Expand All @@ -19,16 +27,14 @@ func GetLog(token string, endpoint string, teamId string, accountId string, sour
BatchSize: 1,
}

var sources []models.Source

source, err := APICalls.GetSource(token, endpoint, teamId, sourceId)
if err != nil {
stop <- err
return
}
sources = append(sources, source)
pbSources := CreateGrpcSource(sources)

sources := []models.Source{source}
pbSources := CreateGrpcSource(sources)
request.Sources = pbSources

filterService := NewFilterService()
Expand All @@ -37,16 +43,60 @@ func GetLog(token string, endpoint string, teamId string, accountId string, sour
for {
select {
case <-stop:
stop <- err
stop <- nil
return
default:
response, err := filterService.Client.GetFilteredData(context.Background(), request)
istLocation, err := time.LoadLocation("UTC")
if err != nil {
log.Println("Error loading IST location:", err)
stop <- err
return
}

currentTime := time.Now().In(istLocation)
formattedTime := currentTime.Format("2006-01-02 15:04:05")

ctxCmd, cancelCmd := context.WithTimeout(context.Background(), 5*time.Second)

cmd := exec.CommandContext(ctxCmd, "curl",
"--location",
config.Get().GrpcIngestion,
"--header", "Content-Type: application/json",
"--header", fmt.Sprintf("Authorization: Bearer %s", sourceToken),
"--data", fmt.Sprintf("[{\"dt\":\"%s\",\"message\":\"%s\"}]", formattedTime, "Hello from Logfire!"),
)

var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out

// Start the curl command
if err := cmd.Start(); err != nil {
log.Println("Error starting curl command:", err)
cancelCmd()
stop <- err
return
}

// Allow the command to run for a short period before cancelling it
time.Sleep(1100 * time.Millisecond)
cancelCmd()

// Wait for the curl command to complete
if err := cmd.Wait(); err != nil && err.Error() != "signal: killed" {
log.Println("Error waiting for curl command:", err)
log.Println("Curl output:", out.String())
stop <- err
return
}

log.Printf("Response: %s\n", response.Records)
// Check response from filter service
response, err := filterService.Client.GetFilteredData(context.Background(), request)
if err != nil {
log.Println("Error getting filtered data:", err)
stop <- err
return
}

if len(response.Records) > 0 {
stop <- nil
Expand Down

0 comments on commit 585cb8e

Please sign in to comment.