Skip to content

Commit

Permalink
sendIngestCommand: better error handling, avoid logging
Browse files Browse the repository at this point in the history
  • Loading branch information
consolethinks committed Aug 13, 2024
1 parent ab040c8 commit 0afb74f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 40 deletions.
77 changes: 40 additions & 37 deletions datasetIngestor/sendIngestCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
)

Expand All @@ -14,15 +13,6 @@ type FileBlock struct {
DatasetId string `json:"datasetId"`
}

type payloadStruct struct {
Text string `json:"text"`
}
type message struct {
ShortMessage string `json:"shortMessage"`
Sender string `json:"sender"`
Payload payloadStruct `json:"payload"`
}

const TOTAL_MAXFILES = 400000
const BLOCK_MAXBYTES = 200000000000 // 700000 for testing the logic
const BLOCK_MAXFILES = 20000 // 20 for testing the logic
Expand Down Expand Up @@ -79,39 +69,48 @@ Returns:
The ID of the created dataset.
*/
func IngestDataset(client *http.Client, APIServer string, metaDataMap map[string]interface{},
fullFileArray []Datafile, user map[string]string) (datasetId string) {
fullFileArray []Datafile, user map[string]string) (datasetId string, err error) {

datasetId = createDataset(client, APIServer, metaDataMap, user)
createOrigDatablocks(client, APIServer, fullFileArray, datasetId, user)
datasetId, err = createDataset(client, APIServer, metaDataMap, user)
if err != nil {
return datasetId, err
}
err = createOrigDatablocks(client, APIServer, fullFileArray, datasetId, user)

return datasetId
return datasetId, err
}

func createDataset(client *http.Client, APIServer string, metaDataMap map[string]interface{}, user map[string]string) string {
func createDataset(client *http.Client, APIServer string, metaDataMap map[string]interface{}, user map[string]string) (string, error) {
cmm, _ := json.Marshal(metaDataMap)
datasetId := ""

if val, ok := metaDataMap["type"]; ok {
dstype := val.(string)
endpoint, err := getEndpoint(dstype)
if err != nil {
log.Fatal(err)
return "", err
}
myurl := APIServer + endpoint + "/?access_token=" + user["accessToken"]
resp := sendRequest(client, "POST", myurl, cmm)
resp, err := sendRequest(client, "POST", myurl, cmm)
if err != nil {
return "", err
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
datasetId = decodePid(resp)
log.Printf("Created dataset with id %v", datasetId)
datasetId, err = decodePid(resp)
if err != nil {
return "", err
}
//log.Printf("Created dataset with id %v", datasetId)
} else {
log.Fatalf("SendIngestCommand: Failed to create new dataset: status code %v\n", resp.StatusCode)
return "", fmt.Errorf("SendIngestCommand: Failed to create new dataset: status code %v", resp.StatusCode)
}
} else {
log.Fatalf("No dataset type defined for dataset %v\n", metaDataMap)
return "", fmt.Errorf("no dataset type defined for dataset %v", metaDataMap)
}

return datasetId
return datasetId, nil
}

func getEndpoint(dstype string) (string, error) {
Expand All @@ -123,37 +122,37 @@ func getEndpoint(dstype string) (string, error) {
case "base":
return "/Datasets", nil
default:
return "", fmt.Errorf("Unknown dataset type encountered: %s", dstype)
return "", fmt.Errorf("unknown dataset type encountered: %s", dstype)
}
}

func sendRequest(client *http.Client, method, url string, body []byte) *http.Response {
func sendRequest(client *http.Client, method, url string, body []byte) (*http.Response, error) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
if err != nil {
log.Fatal(err)
return nil, err
}
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
return nil, err
}

return resp
return resp, nil
}

func decodePid(resp *http.Response) string {
func decodePid(resp *http.Response) (string, error) {
type PidType struct {
Pid string `json:"pid"`
}
decoder := json.NewDecoder(resp.Body)
var d PidType
err := decoder.Decode(&d)
if err != nil {
log.Fatal("Could not decode pid from dataset entry:", err)
return "", fmt.Errorf("could not decode pid from dataset entry: %v", err)
}

return d.Pid
return d.Pid, nil
}

/*
Expand All @@ -174,16 +173,16 @@ If a request receives a response with a status code other than 200, the function
The function logs a message for each created data block, including the start and end file, the total size, and the number of files in the block.
*/
func createOrigDatablocks(client *http.Client, APIServer string, fullFileArray []Datafile, datasetId string, user map[string]string) {
func createOrigDatablocks(client *http.Client, APIServer string, fullFileArray []Datafile, datasetId string, user map[string]string) error {
totalFiles := len(fullFileArray)

if totalFiles > TOTAL_MAXFILES {
log.Fatalf(
"This datasets exceeds (%v) the maximum number of files per dataset , which can currently be handled by the archiving system (%v)\n",
return fmt.Errorf(
"dataset exceeds (%v) the maximum number of files per dataset , which can currently be handled by the archiving system (%v)",
totalFiles, TOTAL_MAXFILES)
}

log.Printf("The dataset contains %v files. \n", totalFiles)
//log.Printf("The dataset contains %v files. \n", totalFiles)

end := 0
var blockBytes int64
Expand All @@ -198,14 +197,18 @@ func createOrigDatablocks(client *http.Client, APIServer string, fullFileArray [

payloadString, _ := json.Marshal(origBlock)
myurl := APIServer + "/OrigDatablocks" + "?access_token=" + user["accessToken"]
resp := sendRequest(client, "POST", myurl, payloadString)
resp, err := sendRequest(client, "POST", myurl, payloadString)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
log.Fatalf("Unexpected response code %v when adding origDatablock for dataset id:%v", resp.Status, datasetId)
return fmt.Errorf("unexpected response code \"%v\" when adding origDatablock for dataset id: \"%v\"", resp.Status, datasetId)
}

log.Printf("Created file block from file %v to %v with total size of %v bytes and %v files \n", start, end-1, blockBytes, end-start)
//log.Printf("Created file block from file %v to %v with total size of %v bytes and %v files \n", start, end-1, blockBytes, end-start)
start = end
}
return nil
}
15 changes: 12 additions & 3 deletions datasetIngestor/sendIngestCommand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func TestSendIngestCommand(t *testing.T) {
defer server.Close()

// Call SendIngestCommand function with the mock server's URL and check the returned dataset ID
datasetId := IngestDataset(client, server.URL, metaDataMap, datafiles, user)
datasetId, err := IngestDataset(client, server.URL, metaDataMap, datafiles, user)
if err != nil {
t.Errorf("received unexpected error: %v", err)
}
if datasetId != "test-dataset-id" {
t.Errorf("Expected dataset id 'test-dataset-id', but got '%s'", datasetId)
}
Expand Down Expand Up @@ -137,7 +140,10 @@ func TestSendRequest(t *testing.T) {
client := &http.Client{}

// Call the sendRequest function
resp := sendRequest(client, "GET", ts.URL, nil)
resp, err := sendRequest(client, "GET", ts.URL, nil)
if err != nil {
t.Errorf("received unexpected error: %v", err)
}

// Check the response
if resp.StatusCode != http.StatusOK {
Expand All @@ -152,7 +158,10 @@ func TestDecodePid(t *testing.T) {
}

// Call the decodePid function
pid := decodePid(resp)
pid, err := decodePid(resp)
if err != nil {
t.Errorf("received unexpected error: %v", err)
}

// Check the returned pid
if pid != "12345" {
Expand Down

0 comments on commit 0afb74f

Please sign in to comment.