Skip to content

Commit

Permalink
refactor datasetIngestor command further
Browse files Browse the repository at this point in the history
  • Loading branch information
consolethinks committed Aug 23, 2024
1 parent 64b2948 commit 0e1db34
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 102 deletions.
219 changes: 119 additions & 100 deletions cmd/commands/datasetIngestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ For Windows you need instead to specify -user username:password on the command l
log.Printf("You are about to add a dataset to the === %s === data catalog environment...", env)
color.Unset()

// TODO: change pointer parameter types to values as they shouldn't be modified by the function
user, accessGroups := authenticate(RealAuthenticator{}, client, APIServer, userpass, token)

/* TODO Add info about policy settings and that autoarchive will take place or not */
Expand Down Expand Up @@ -208,12 +207,15 @@ For Windows you need instead to specify -user username:password on the command l
// log.Printf("Selected folders: %v\n", folders)

// test if a sourceFolder already used in the past and give warning
log.Println("Testing for existing source folders...")
foundList, err := datasetIngestor.TestForExistingSourceFolder(datasetPaths, client, APIServer, user["accessToken"])
if err != nil {
log.Fatal(err)
}
color.Set(color.FgYellow)
fmt.Println("Warning! The following datasets have been found with the same sourceFolder: ")
if len(foundList) > 0 {
fmt.Println("Warning! The following datasets have been found with the same sourceFolder: ")
}
for _, element := range foundList {
fmt.Printf(" - PID: \"%s\", sourceFolder: \"%s\"\n", element.Pid, element.SourceFolder)
}
Expand All @@ -235,8 +237,13 @@ For Windows you need instead to specify -user username:password on the command l
// a destination location is defined by the archive system
// for now let the user decide if he needs a copy

// now everything is prepared, start to loop over all folders
var skip = ""
// now everything is prepared, prepare to loop over all folders
if nocopyFlag {
copyFlag = false
}
checkCentralAvailability := !(cmd.Flags().Changed("copy") || cmd.Flags().Changed("nocopy") || beamlineAccount || copyFlag)
skip := ""

// check if skip flag is globally defined via flags:
if cmd.Flags().Changed("linkfiles") {
switch linkfiles {
Expand All @@ -249,8 +256,9 @@ For Windows you need instead to specify -user username:password on the command l
}
}

var datasetList []string
var archivableDatasetList []string
for _, datasetSourceFolder := range datasetPaths {
log.Printf("===== Ingesting: \"%s\" =====\n", datasetSourceFolder)
// ignore empty lines
if datasetSourceFolder == "" {
// NOTE if there are empty source folder(s), shouldn't we raise an error?
Expand All @@ -259,123 +267,133 @@ For Windows you need instead to specify -user username:password on the command l
metaDataMap["sourceFolder"] = datasetSourceFolder
log.Printf("Scanning files in dataset %s", datasetSourceFolder)

// get filelist of dataset
fullFileArray, startTime, endTime, owner, numFiles, totalSize, err :=
datasetIngestor.GetLocalFileList(datasetSourceFolder, datasetFileListTxt, &skip)
if err != nil {
log.Fatalf("Can't gather the filelist of \"%s\"", datasetSourceFolder)
}

//log.Printf("full fileListing: %v\n Start and end time: %s %s\n ", fullFileArray, startTime, endTime)
log.Printf("The dataset contains %v files with a total size of %v bytes.", numFiles, totalSize)

// filecount checks
if totalSize == 0 {
emptyDatasets++
color.Set(color.FgRed)
log.Println("This dataset contains no files and will therefore NOT be stored. ")
log.Printf("\"%s\" dataset cannot be ingested - contains no files\n", datasetSourceFolder)
color.Unset()
} else if numFiles > TOTAL_MAXFILES {
continue
}
if numFiles > TOTAL_MAXFILES {
tooLargeDatasets++
color.Set(color.FgRed)
log.Printf("This dataset exceeds the current filecount limit of the archive system of %v files and will therefore NOT be stored.\n", TOTAL_MAXFILES)
log.Printf("\"%s\" dataset cannot be ingested - too many files: has %d, max. %d\n", datasetSourceFolder, numFiles, TOTAL_MAXFILES)
color.Unset()
} else {
// TODO: change tapecopies param type of UpadateMetaData from pointer to regular int
// (it's not changed within the function)
datasetIngestor.UpdateMetaData(client, APIServer, user, originalMap, metaDataMap, startTime, endTime, owner, tapecopies)
pretty, _ := json.MarshalIndent(metaDataMap, "", " ")

log.Printf("Updated metadata object:\n%s\n", pretty)

// check if data is accesible at archive server, unless beamline account (assumed to be centrally available always)
// and unless copy flag defined via command line
if !copyFlag && !nocopyFlag { // NOTE this whole copyFlag, nocopyFlag ordeal makes no sense whatsoever
if !beamlineAccount {
sshErr, otherErr := datasetIngestor.CheckDataCentrallyAvailableSsh(user["username"], RSYNCServer, datasetSourceFolder, os.Stdout)
if otherErr != nil {
log.Fatalf("CheckDataCentrallyAvailableSsh returned an error: %v\n", otherErr)
}
if sshErr != nil {
color.Set(color.FgYellow)
log.Printf("The source folder %v is not centrally available (decentral use case).\nThe data must first be copied to a rsync cache server.\n ", datasetSourceFolder)
color.Unset()
copyFlag = true
// check if user account
if len(accessGroups) == 0 {
color.Set(color.FgRed)
log.Println("For the decentral case you must use a personal account. Beamline accounts are not supported.")
color.Unset()
os.Exit(1)
}
if !noninteractiveFlag {
log.Printf("Do you want to continue (Y/n)? ")
scanner.Scan()
continueFlag := scanner.Text()
if continueFlag == "n" {
log.Fatalln("Further ingests interrupted because decentral case detected, but no copy wanted.")
}
}
continue
}

// NOTE: only tapecopies=1 or 2 does something if set.
if tapecopies == 2 {
color.Set(color.FgYellow)
log.Printf("Note: this dataset, if archived, will be copied to two tape copies")
color.Unset()
}
datasetIngestor.UpdateMetaData(client, APIServer, user, originalMap, metaDataMap, startTime, endTime, owner, tapecopies)
pretty, _ := json.MarshalIndent(metaDataMap, "", " ")

log.Printf("Updated metadata object:\n%s\n", pretty)

// check if data is accesible at archive server, unless beamline account (assumed to be centrally available always)
// and unless (no)copy flag defined via command line
if checkCentralAvailability {
sshErr, otherErr := datasetIngestor.CheckDataCentrallyAvailableSsh(user["username"], RSYNCServer, datasetSourceFolder, os.Stdout)
if otherErr != nil {
log.Fatalln("Cannot check if data is centrally available:", otherErr)
}
// if the ssh command's error is not nil, the dataset is *likely* to be not centrally available (maybe should check the error returned)
if sshErr != nil {
color.Set(color.FgYellow)
log.Printf("The source folder %v is not centrally available.\nThe data must first be copied.\n ", datasetSourceFolder)
color.Unset()
copyFlag = true
// check if user account
if len(accessGroups) == 0 {
color.Set(color.FgRed)
log.Println("For copying, you must use a personal account. Beamline accounts are not supported.")
color.Unset()
os.Exit(1)
}
if !noninteractiveFlag {
log.Printf("Do you want to continue (Y/n)? ")
scanner.Scan()
continueFlag := scanner.Text()
if continueFlag == "n" {
log.Fatalln("Further ingests interrupted because copying is needed, but no copy wanted.")
}
} else {
copyFlag = false // beamline accounts don't need copying then, but is beamline account checking needed outside PSI?
}
}
}

if ingestFlag {
// create ingest . For decentral case delay setting status to archivable until data is copied
archivable := false
if _, ok := metaDataMap["datasetlifecycle"]; !ok {
metaDataMap["datasetlifecycle"] = map[string]interface{}{}
}
if copyFlag { // IDEA: maybe add a flag to indicate that we want to copy later?
// do not override existing fields
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = false
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "filesNotYetAvailable"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = false
} else {
if !copyFlag {
// NOTE *in this case* copyflag is ALWAYS false, nocopyFlag is ALWAYS true
// why is this not just an assignment to FALSE then?
copyFlag = !nocopyFlag
}
archivable = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "datasetCreated"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = true
}
if ingestFlag {
// create ingest . For decentral case delay setting status to archivable until data is copied
archivable := false
if _, ok := metaDataMap["datasetlifecycle"]; !ok {
metaDataMap["datasetlifecycle"] = map[string]interface{}{}
}
if copyFlag {
// do not override existing fields
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = false
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "filesNotYetAvailable"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = false
} else {
archivable = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "datasetCreated"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = true
}
datasetId, err := datasetIngestor.IngestDataset(client, APIServer, metaDataMap, fullFileArray, user)
log.Println("Ingesting dataset...")
datasetId, err := datasetIngestor.IngestDataset(client, APIServer, metaDataMap, fullFileArray, user)
if err != nil {
log.Fatal("Couldn't ingest dataset:", err)
}
log.Println("Dataset created:", datasetId)
// add attachment optionally
if addAttachment != "" {
log.Println("Adding attachment...")
err := datasetIngestor.AddAttachment(client, APIServer, datasetId, metaDataMap, user["accessToken"], addAttachment, addCaption)
if err != nil {
log.Fatalf("ingestion returned an error: %v\n", err)
log.Println("Couldn't add attachment:", err)
}
// add attachment optionally
if addAttachment != "" {
err := datasetIngestor.AddAttachment(client, APIServer, datasetId, metaDataMap, user["accessToken"], addAttachment, addCaption)
log.Printf("Attachment file %v added to dataset %v\n", addAttachment, datasetId)
}
if copyFlag {
// TODO rewrite SyncDataToFileserver
log.Println("Syncing files to cache server...")
err := datasetIngestor.SyncLocalDataToFileserver(datasetId, user, RSYNCServer, datasetSourceFolder, absFileListing, os.Stdout)
if err == nil {
// delayed enabling
archivable = true
err := datasetIngestor.MarkFilesReady(client, APIServer, datasetId, user)
if err != nil {
log.Println("Couldn't add attachment:", err)
}
log.Printf("Attachment file %v added to dataset %v\n", addAttachment, datasetId)
}
if copyFlag {
err := datasetIngestor.SyncLocalDataToFileserver(datasetId, user, RSYNCServer, datasetSourceFolder, absFileListing, os.Stdout)
if err == nil {
// delayed enabling
archivable = true
datasetIngestor.MarkFilesReady(client, APIServer, datasetId, user)
} else {
color.Set(color.FgRed)
log.Printf("The command to copy files exited with error %v \n", err)
log.Printf("The dataset %v is not yet in an archivable state\n", datasetId)
// TODO let user decide to delete dataset entry
// datasetIngestor.DeleteDatasetEntry(client, APIServer, datasetId, user["accessToken"])
color.Unset()
log.Fatal("Couldn't mark files ready:", err)
}
} else {
color.Set(color.FgRed)
log.Printf("The command to copy files exited with error %v \n", err)
log.Printf("The dataset %v is not yet in an archivable state\n", datasetId)
// TODO let user decide to delete dataset entry
// datasetIngestor.DeleteDatasetEntry(client, APIServer, datasetId, user["accessToken"])
color.Unset()
}

if archivable {
datasetList = append(datasetList, datasetId)
}
log.Println("Syncing files - DONE")
}
datasetIngestor.ResetUpdatedMetaData(originalMap, metaDataMap)

if archivable {
archivableDatasetList = append(archivableDatasetList, datasetId)
}
}
datasetIngestor.ResetUpdatedMetaData(originalMap, metaDataMap) // I don't really get this...
}

if !ingestFlag {
Expand All @@ -392,7 +410,7 @@ For Windows you need instead to specify -user username:password on the command l
log.Printf("Number of datasets not stored because of too many files:%v\nPlease note that this will cancel any subsequent archive steps from this job !\n", tooLargeDatasets)
}
color.Unset()
datasetIngestor.PrintFileInfos()
datasetIngestor.PrintFileInfos() // TODO: move this into cmd portion

// stop here if empty datasets appeared
if emptyDatasets > 0 || tooLargeDatasets > 0 {
Expand All @@ -403,17 +421,18 @@ For Windows you need instead to specify -user username:password on the command l
log.Printf("Submitting Archive Job for the ingested datasets.\n")
// TODO: change param type from pointer to regular as it is unnecessary
// for it to be passed as pointer
_, err := datasetUtils.CreateArchivalJob(client, APIServer, user, datasetList, &tapecopies)
jobId, err := datasetUtils.CreateArchivalJob(client, APIServer, user, archivableDatasetList, &tapecopies)
if err != nil {
color.Set(color.FgRed)
log.Printf("Could not create the archival job for the ingested datasets: %s", err.Error())
color.Unset()
}
log.Println("Submitted job:", jobId)
}

// print out results to STDOUT, one line per dataset
for i := 0; i < len(datasetList); i++ {
fmt.Println(datasetList[i])
for i := 0; i < len(archivableDatasetList); i++ {
fmt.Println(archivableDatasetList[i])
}

},
Expand All @@ -438,5 +457,5 @@ func init() {
datasetIngestorCmd.Flags().String("addcaption", "", "Optional caption to be stored with attachment (single dataset case only)")

datasetIngestorCmd.MarkFlagsMutuallyExclusive("testenv", "devenv", "localenv", "tunnelenv")
//datasetIngestorCmd.MarkFlagsMutuallyExclusive("nocopy", "copy")
datasetIngestorCmd.MarkFlagsMutuallyExclusive("nocopy", "copy")
}
3 changes: 1 addition & 2 deletions cmd/commands/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestMainFlags(t *testing.T) {
"tunnelenv": false,
"noninteractive": true,
"copy": true,
"nocopy": true,
"nocopy": false,
"autoarchive": true,
"allowexistingsource": true,
"version": true,
Expand All @@ -190,7 +190,6 @@ func TestMainFlags(t *testing.T) {
"--token",
"token",
"--copy",
"--nocopy",
"--tapecopies",
"6571579",
"--autoarchive",
Expand Down

0 comments on commit 0e1db34

Please sign in to comment.