Skip to content

Commit

Permalink
Fixed surplus Group export; added initial "Hive" naming support for P…
Browse files Browse the repository at this point in the history
…arquet
  • Loading branch information
jrawbits committed Sep 25, 2023
1 parent 8a6610a commit 294b4fe
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 73 deletions.
5 changes: 2 additions & 3 deletions sources/framework/VEModel/R/environment.R
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,14 @@ loadRuntimeConfig <- function() {
# ParamDir defaults to ve.runtime
ve.env <- runtimeEnvironment()
if ( is.null(ve.env$ve.runtime) ) setRuntimeDirectory() # VE_RUNTIME or getwd()
# TODO: load/build VE package manifest (for installModel, plus list
# of modules).
# TODO: load/build VE package manifest (for installModel, plus list of modules).
# buildPackageManifest() # file called .VE-packages.lst (hidden attribute like .REnviron).
return( visioneval::loadConfiguration(ParamDir=ve.env$ve.runtime) )
}

#GET RUNTIME SETUP
#=================

# TODO: Add VEResults as a supported object type (and return the RunParam_ls from its model state.
#' Return runtime base RunParam_ls (loading it if not present)
#'
#' \code{getSetup} gets a subset of the current runParameters by name. It does NOT
Expand Down Expand Up @@ -276,6 +274,7 @@ viewSetup <- function(object=NULL,Param_ls=NULL,fromFile=FALSE,
updateSetup <- function(object=NULL,inFile=TRUE,Source="interactive",Param_ls=list(),drop=character(0),...) {
# TODO: for purposes of model Run Status, change status of "object" to "now" so when we next run
# the model, stages with a "RunComplete" time stamp earlier than now will all be marked for reset.
# Probably not a big deal since manual reset is better in any case.

# merge ... into Param_ls (expecting a named list of arbitrary parameters)
Param_ls <- visioneval::mergeParameters(
Expand Down
42 changes: 25 additions & 17 deletions sources/framework/VEModel/R/export.R
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ ve.partition.partition <- function(theData,Table) {
nValue <- locFields[[n]]
locSelection <- locSelection & (theData[[n]] == nValue) # knock off rows not matching this value
}
tableLoc <- VETableLocator$new(Paths=Paths,Names=Names,Table=Table,Range=which(locSelection))
tableLoc <- VETableLocator$new(Paths=Paths,Names=Names,Table=Table,Range=which(locSelection),hive=self$Hive)
TableLocs[[tableLoc$tableString()]] <- tableLoc
}
return(TableLocs)
Expand All @@ -147,7 +147,7 @@ ve.partition.partition <- function(theData,Table) {
ve.partition.locate <- function(theData,Table) {
# Creates a TableLoc for an unpartitioned Table
# Used in VEExporter$write for arbitrary data, including query results and metadata
tableLoc <- VETableLocator$new(Table=Table,Range=1:nrow(theData))
tableLoc <- VETableLocator$new(Table=Table,Range=1:nrow(theData)) # hive always false in this case
locList <- list()
locList[[ tableLoc$tableString() ]] <- tableLoc
return( locList)
Expand All @@ -167,7 +167,8 @@ VEPartition <- R6::R6Class(
"VEPartition",
public = list(
# public data
Partition = character(0), # Describes fields to be partitioned into different output tables
Partition = character(0), # Describes fields to be partitioned into different output tables
Hive = FALSE, # Set to TRUE to make folders be Field=Value when creating partitioned locations

# methods
initialize=ve.partition.init, # initialize internal partition
Expand Down Expand Up @@ -209,14 +210,15 @@ VEPartition <- R6::R6Class(
# @param pathSep a character string used to separate path elements when building the table name
# @param nameSep a character string used to separate name elements when building the table name
# @param tableSep a character string used to separate path elements from table plus name elements;
#

ve.locator.string <- function(
pathSep="/", nameSep="_", tableSep=":"
) {
tableString <- if ( length(self$Names) > 0 ) {
paste(self$Table,paste(self$Names,collapse=nameSep),sep=nameSep)
} else self$Table
if ( length(self$Paths) > 0 ) {
Paths <- if ( self$Hive ) paste(names(self$Paths),self$Paths,sep="=") else self$Paths
tableString <- paste(paste(self$Paths,collapse=pathSep),tableString,sep=tableSep)
}
return(tableString)
Expand All @@ -230,11 +232,12 @@ ve.locator.print <- function(...) {
}

# Initialize a VETableLocator
ve.locator.init <- function(Table,Range=integer(0),Paths=character(0),Names=character(0)) {
self$Paths = Paths
ve.locator.init <- function(Table,Range=integer(0),Paths=character(0),Names=character(0),hive=FALSE) {
self$Paths = Paths # For Hive to work, Paths must be a named character vector
self$Names = Names
self$Table = Table
self$Range = Range
self$Hive = hive
}

ve.locator.append <- function(Name) self$Names <- append(self$Names,Name)
Expand All @@ -249,6 +252,7 @@ VETableLocator <- R6::R6Class(
Paths = character(0),
Names = character(0),
Table = NULL,
Hive = FALSE,

# methods
initialize=ve.locator.init, # Create the table location
Expand Down Expand Up @@ -405,6 +409,7 @@ ve.exporter.init <- function(Model,load=NULL,tag="default",connection=NULL,parti
# either of the following may stop if the Configuration is inadequate
self$Connection <- makeVEConnection(Model,self$Configuration$Connection) # Returns a VEConnection subclass
self$Partition <- VEPartition$new(self$Configuration$Partition)
self$Partition$Hive <- self$Connection$Hive # True for Parquet, generally false otherwise
}

# subclasses will do more with Connection and Partition prior to saving them
Expand Down Expand Up @@ -561,7 +566,6 @@ ve.exporter.metadata <- function() {
# the Units actually written plus the N description. DBTableName is the locator encoded
# form (makeTableString with default parameters). Metadata is just intended to understand
# what is in TableName.
# TODO: actually build the full metadata (S/G/T/N/...) while writing tables
metadatalist <- lapply( names(self$TableList), function(t) {
metadata <- self$TableList[[t]]
} )
Expand Down Expand Up @@ -687,7 +691,7 @@ ve.connection.missing <- function(dataFields,Table) {
}

# ve.connection.init <- function(config) {} # initialize the connection from parameters
ve.connection.init <- function(Model,config,reopen=FALSE) {
ve.connection.init <- function(Model,config,reopen=FALSE,hive=FALSE) {
# Add Timestamp if it is going to be part of the database name (or the CSV/Parquet folder)
if ( ! reopen ) {
if ( "Timestamp" %in% names(config) && isTRUE(config[["Timestamp"]]=="database") ) {
Expand All @@ -700,6 +704,7 @@ ve.connection.init <- function(Model,config,reopen=FALSE) {
}
# derived classes may then use Timestamp and TimeSeparator to create the Database/Folder name
}
self$Hive <- hive # make sure hive gets saved and loaded...
}

# Generic implementation uses derived class functions to do the work
Expand Down Expand Up @@ -747,6 +752,8 @@ VEConnection <- R6::R6Class(
# public data
Timestamp = NULL, # pull from config
TimeSeparator = NULL, # defaults to "_" if Timestamp exists
Hive = FALSE, # connection wants "hive" partitioning (Parquet sets in its constructor;
# can be set manually for other connection types like CSV)

# methods (each connecton type will implement its own version of these)
initialize = ve.connection.init, # call from subclasses to establish internal connection and partition
Expand Down Expand Up @@ -783,7 +790,7 @@ VEConnection <- R6::R6Class(
createTable = function(Data,Table) NULL, # Create or re-create a Table from scratch (includes append)
appendTable = function(Data,Table) NULL, # perform low-level append data operation
readTable = function(Table) NULL, # Read named table into a data.frame
save = function() return(list()), # Return private data for saving/reopening connnection
save = function() return(list(Hive=self$Hive)), # Return private data for saving/reopening connnection
open = function() NULL, # Reopen the connection (optional for DBI)
close = function() NULL # Close connection (needed for DBI)
# 'Table' should be a TableLocator string built with nameTable for the Connection
Expand Down Expand Up @@ -851,9 +858,9 @@ VEConnection.Dataframe <- R6::R6Class(
#################################

#' @import data.table
ve.connection.csv.init <- function(Model,config,reopen=FALSE) {
ve.connection.csv.init <- function(Model,config,reopen=FALSE,hive=FALSE) {
# CSV provides a default name for Directory
super$initialize(Model,config)
super$initialize(Model,config,hive)
if ( ! reopen ) {
if ( ! "Directory" %in% names(config) ) {
if ( "Database" %in% names(config) ) {
Expand Down Expand Up @@ -943,7 +950,7 @@ VEConnection.CSV <- R6::R6Class(
readTable = ve.connection.csv.readTable,

# methods
save = function() return(private$Directory)
save = function() return(c(super$save(),list(Directory=private$Directory)))
),
private = list(
Directory = NULL # Default to "OutputDir/Export_CSV<TimeSeparator><Timestamp>" in initializer
Expand All @@ -959,8 +966,8 @@ VEConnection.CSV <- R6::R6Class(
#' @import DBI
#' @import RSQLite
#' @importFrom methods new
ve.connection.dbi.init <- function(Model,config,reopen=FALSE) {
super$initialize(Model,config)
ve.connection.dbi.init <- function(Model,config,reopen=FALSE,hive=FALSE) {
super$initialize(Model,config,hive)
# Two avenues here:
# If we're missing a full DBI configuration, presume SQLite
# If there is a full DBI configuration, fill in blanks like dbname from outside Database
Expand Down Expand Up @@ -1209,10 +1216,11 @@ connectionList <- list(
#' is the name of the connection specification block (either a built-in default or defined in the model
#' or global visioneval.cnf).
#' @param reopen if TRUE will not create a new database name (using saved configuration), otherwise
#'' build a new name (not all connection types will care - mostly to avoid Timestamp problems)
#' build a new name (not all connection types will care - mostly to avoid Timestamp problems)
#' @param hive if TRUE will produce path elements as Field=Value, otherwise just Value
#' @return A VEConnection (or derived) object giving access to the VisionEval results in `path`
#' @export
makeVEConnection <- function(Model,config=list(driver="csv"),reopen=FALSE) {
makeVEConnection <- function(Model,config=list(driver="csv"),reopen=FALSE,hive=FALSE) {
# Usually called from within VEExportef initialization, which will provide
# useful connection defaults
# Find driver class from config (default is "csv")
Expand All @@ -1228,5 +1236,5 @@ makeVEConnection <- function(Model,config=list(driver="csv"),reopen=FALSE) {
writeLog(paste("Creating Driver for ",driverClass$classname),Level="info")
}
# Create new driver object using "config"
return ( driverClass$new(Model,config,reopen) )
driverObject <- driverClass$new(Model,config,reopen,hive)
}
7 changes: 0 additions & 7 deletions sources/framework/VEModel/R/models.R
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ ve.model.clear <- function(force=FALSE,outputOnly=NULL,archives=FALSE,stage=NULL

if ( ! isTRUE(outputOnly) && ! isTRUE(archives) && is.character(stage) ) {
# keep only files in subdirectories matching stage$Dir in results
# TODO: make this work so stages passed by name will have their results selected (only)
stageDirs <- sapply(self$modelStages[stage],function(s) s$Dir)
stage <- stage[ stage!="." & stage %in% stageDirs ]
if ( any(stages) ) {
Expand Down Expand Up @@ -1117,7 +1116,6 @@ ve.stage.init <- function(modelParam_ls,Name=NULL,Model=NULL,ScenarioDir=NULL,st
writeLog(paste("Stage",self$Name,"stageConfig_ls contains:"),Level="debug")
writeLog(paste(names(stageConfig_ls),collapse=", "),Level="debug")
writeLog(paste("Stage Input Path from stageConfig_ls:",stageConfig_ls$InputPath),Level="debug")
# TODO: stageConfig_ls$InputPath is correct here
} else {
writeLog("stageConfig_ls has no additional parameters",Level="debug")
}
Expand Down Expand Up @@ -2019,11 +2017,6 @@ ve.model.fixRunStatus <- function(stage,outOfDate=integer(0)) {
# Running Models #
################################################################################

# TODO: Consult SaveDatastore in self$ModelState_ls
# TODO: Make sure a classic model (where SaveDatastore may be buried rather deep)
# can still be interpreted properly. We still need to pull out the "initializeModel"
# parameters...

# Proxy some visioneval functions (things that might get called from
# classic run_model.R without namespace resolution).
getYears <- visioneval::getYears
Expand Down
34 changes: 2 additions & 32 deletions sources/framework/VEModel/R/query.R
Original file line number Diff line number Diff line change
Expand Up @@ -581,34 +581,6 @@ ve.query.getlist <- function() {
# "Name" is always included to support cbind
defaultMetadata <- c("Units","Description")

# TODO: In "Wide" format, keep the "By" fields in their own columns: that will be a bit more work if
# different queries have different combinations of By fields, but the logic is already implemented
# for Long format, and we'll want it for any extraction that is intended to be analyzed in a
# database system.

# TODO: Perhaps implement the same "Extractor" logic for queries as for regular data:
# Generate metric vectors for each Scenario/Year and add those to a list of metric columns
# that are then later formatted by the Extractor into "Long" and "Wide". That would simplify
# injecting the By columms, because we could just review all the vectors at the end to figure
# out how to make them conform for purposes of being in a data.frame (or some other output).

# OutputReceiver:
# Connect to "table location" (identify a directory, open a DBI connection)
# Create table (given a set of data that will go into it)
# Append rows to table - Create/Recreate could just be an option on a single "writeToTable"
# function that the Extractor will process

# Ahead of that, we'll assemble a list of unreconciled vectors, accumulate them in a list Review all
# their column names and make sure all rows are present (with NAs if needed) then just "cbind" them
# together. That's a lot easier if we don't have to preserve the data.frame structure along the way
# (just make a list of conforming vectors, then magically wave the wand over it and *poof* it's a
# data.frame). For long format, we make each row into vectors (and then conform the names at the
# end), and then just "rbind" instead of "cbind".

# "long" and "wide" will build different vectors and assemble them differently.

# The logic will be to generate columns where each one is a set of metrics

# make a data.frame of all (and only) the valid query results
# the results is a single data.frame with attributes
ve.query.extract <- function(
Expand Down Expand Up @@ -2296,9 +2268,7 @@ makeLongMeasureDataframe <- function(Values,Scenario="",Year=NULL,Metadata=chara

# doQuery processes a list of VEResults, and generates QueryFile in their Path
doQuery <- function (
# TODO: Results should be a VEResultsList
# TODO: iterating over Results should use Results$results()
Results, # a list of VEResult object(s) corresponding to Reportable scenarios
Results, # a list of VEResults object corresponding to Reportable scenarios
Specifications, # validated query specification to process
QueryFile, # Name of query file in which to save results
Timestamp=Sys.time() # Pass as parameter since model calling doQuery will need it too
Expand Down Expand Up @@ -2333,7 +2303,7 @@ doQuery <- function (
# Set up model state and datastore for framework query processing
# TODO: Qprep_ls is overkill - should revise SummarizeDatasets to use
# just the ModelState (multiple Datastores are handled internally through
# the virtual Datastore path).
# the virtual DatastorePath).
QPrep_ls <- results$queryprep()

# Set up result structure for this scenario
Expand Down
25 changes: 14 additions & 11 deletions sources/framework/VEModel/R/results.R
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ ve.resultslist.export <- function(
# Apply a selection if provided, otherwise use entire list of outputs
# Reduces the resultsIndex to a subset then figures out S/G/T/N from whatever is left
# Default is everything available in the VEResultsList
# TODO: make sure list produces the full list...
if ( ! is.null(selection) ) {
self$select(selection) # just apply the selection, copying it
}
Expand Down Expand Up @@ -263,9 +262,7 @@ ve.resultslist.list <- function(pattern="", selected=TRUE, details=FALSE, ...) {
return(unique(ret.value))
}

# TODO: is this function ever used, or is it still relevant?
# TODO: written to work on an individual VEResults - now should list all selected
# TODO: could we just add the input files and directories to standard metadata?
# TODO: this function may not be used. Do we still need it?
# We won't necessarily know the input file until after the model is run (otherwise, this function should have been a
# member of VEModel)
ve.resultslist.inputs <- function( fields=FALSE, module="", filename="" ) {
Expand Down Expand Up @@ -371,7 +368,7 @@ VEResultsList <- R6::R6Class(
list=ve.resultslist.list, # show the consolidated resultsIndex (used by export to develop metadata table)
select=ve.resultslist.select, # return the list's selection definition
find=ve.resultslist.find, # constructs but does not embed the selection definition
units=ve.resultslist.units, # Set units on field list (modifies self$modelIndex) TODO: Move/wrap in VEResultsList
units=ve.resultslist.units, # Set units on field list (modifies self$modelIndex)
valid=function() self$isValid # report state of validity
)
)
Expand Down Expand Up @@ -647,6 +644,7 @@ ve.results.index <- function() {
self$RunParam_ls <- ms$RunParam_ls
}

# Get information from all the StartFrom stages
msList <- rev(visioneval::getModelStatePaths(dropFirst=FALSE,envir=private$modelStateEnv))
Index <- data.frame()
Inputs <- data.frame()
Expand Down Expand Up @@ -699,26 +697,31 @@ ve.results.index <- function() {
Name = fieldGTN$Name, # Should be identical to ds$name
Description = Description,
Units = Units,
# TODO: May need some other specification fields in order to identify variable type for SQL or other export
Module = Module,
Scenario = scenario,
File = File, # "" if not an Input
InputDir = InputDir # "" if not an Input
)

# GroupTableName is now a data.frame with nine columns
# Index is now a data.frame with nine columns
# complete.cases blows away the rows that have any NA values
# (each row is a "case" in stat lingo, and the "complete" ones have a non-NA value for each column)
# Reduces the raw Datastore index to just the Fields ("Name"s) in the Datastore
ccases <- stats::complete.cases(Index[,c("Group","Table","Name")])
Index <- Index[ccases,]

# Clean up index so Global and BaseYear groups only appears in the Base scenario(s)
RunYears = as.character(self$RunParam_ls$Years)
BaseYear = as.character(self$RunParam_ls$BaseYear)
if ( ! BaseYear %in% RunYears ) {
Index <- Index[ ! Index$Group %in% c("Global",BaseYear), ] # Remove Global and BaseYear
}
self$modelIndex <- Index

invisible(self$modelIndex)
}

# Helper function to attach DisplayUnits to a list of Group/Table/Name rows in a data.frame
# Need to do this in VEResults since we need access to the model state...
# TODO: Move this to VEResultsList (using Param_ls from Model or first VEResults)
addDisplayUnits <- function(GTN_df,Param_ls) {
# GTN_df is a data.frame with "Group","Table","Name" rows for each Name/field for which display
# units are sought. Always re-open the DisplayUnits file, as it may have changed since the last
Expand Down Expand Up @@ -803,8 +806,8 @@ ve.results.elements <- function() {
}

# Wrapper for visioneval::copyDatastore
# TODO: add a wrapper in VEResultsList that will copy all the model results to another
# ToDir - VEResultsList will need to manage the directories...
# TODO: We won't ever want to do this one Result at a time - should
# manage in VEResultsList
ve.results.copy <- function(ToDir, Flatten=TRUE, DatastoreType=NULL, overwrite=FALSE) {
if ( missing(ToDir) ) {
stop(writeLog("Must provide target directory path.",Level="error"))
Expand Down
Loading

0 comments on commit 294b4fe

Please sign in to comment.