Skip to content

Commit

Permalink
jira-595: add filename to datapartition
Browse files Browse the repository at this point in the history
  • Loading branch information
drealeed committed May 15, 2024
1 parent 680baa3 commit 33b1823
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DataPartition implements Serializable
private String fileAccessBlob;
private FileType fileType;
private boolean isTLK;
private String fileName;

public static enum FileType
{
Expand Down Expand Up @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i
* the file type
*/
private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter,
String fileAccessBlob, FileType fileType)
String fileAccessBlob, FileType fileType) {
this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null);
}
/**
* Construct the data part, used by makeParts.
*
* @param copylocations
* locations of all copies of this file part
* @param copyPaths
* the copy paths
* @param this_part
* part number
* @param num_parts
* number of parts
* @param clearport
* port number of clear communications
* @param sslport
* port number of ssl communications
* @param filter
* the file filter object
* @param fileAccessBlob
* file access token
* @param fileType
* the file type
* @param fileName
* the file name
*/
private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter,
String fileAccessBlob, FileType fileType, String fileName)
{
this.this_part = this_part;
this.num_parts = num_parts;
this.rowservicePort = clearport;
this.useSSL = sslport;
this.fileFilter = filter;
this.fileName=fileName;
if (this.fileFilter == null)
{
this.fileFilter = new FileFilter();
Expand Down Expand Up @@ -348,6 +378,16 @@ public boolean getUseSsl()
return useSSL;
}

/**
* File name being read
*
* @return filename
*/
public String getFileName()
{
return fileName;
}

/**
* Copy Path.
*
Expand Down Expand Up @@ -470,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl
return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT);
}


/**
* Creates the partitions.
*
* @param dfuparts
* the dfuparts
* @param clusterremapper
* the clusterremapper
* @param max_parts
* the max parts
* @param filter
* the filter
* @param fileAccessBlob
* the file access blob
* @param fileType
* the file type
* @return the data partition[]
* @throws HpccFileException
* the hpcc file exception
*/
public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter,
String fileAccessBlob, FileType fileType) throws HpccFileException {
return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null);
}

/**
* Creates the partitions.
*
Expand All @@ -485,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl
* the file access blob
* @param fileType
* the file type
* @param fileName
* the file name
* @return the data partition[]
* @throws HpccFileException
* the hpcc file exception
*/
public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter,
String fileAccessBlob, FileType fileType) throws HpccFileException
String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException
{
DataPartition[] rslt = new DataPartition[dfuparts.length];

Expand All @@ -507,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl

DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(),
dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob,
fileType);
fileType,fileName);
new_dp.isTLK = dfuparts[i].isTopLevelKey();

rslt[i] = new_dp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException
{
ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread);
this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper,
/* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType);
/* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName());

// Check to see if this file has a TLK. The TLK will always be the last partition.
// If we do have a TLK remove it from the standard list of data partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,36 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @throws Exception
* general exception
*/
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception {
this(dp, originalRD, recBuilder, connectTimeout, limit, createPrefetchThread, readSizeKB, resumeInfo, RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS,null);
}
/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* the connection timeout in milliseconds, -1 for default
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @param resumeInfo
* FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start
* @param socketOpTimeoutMs
* Socket (read / write) operation timeout in milliseconds
* @param fileName
* filename to read
* @throws Exception
* general exception
*/
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs, String fileName) throws Exception
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception
{
this.handlePrefetch = createPrefetchThread;
this.originalRecordDef = originalRD;
Expand Down Expand Up @@ -252,7 +223,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, fileName);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand Down Expand Up @@ -534,7 +505,7 @@ public void close() throws Exception

long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart()
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName()
+ " read time: " + readTimeS + "s "
+ " records read: " + recordsRead);
}
Expand Down Expand Up @@ -579,7 +550,7 @@ public void report()
{
if (getRemoteReadMessageCount() > 0)
{
log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n");
log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n");
log.warn(getRemoteReadMessages());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class RowServiceInputStream extends InputStream implements IProfilable
private String projectedJsonRecordDefinition = null;
private java.io.DataInputStream dis = null;
private java.io.DataOutputStream dos = null;
private String fileName = null;
private String rowServiceVersion = "";

private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct
Expand Down Expand Up @@ -326,42 +325,9 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
* @throws Exception
* general exception
*/
public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception {
this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS,null);
}

/**
* A plain socket connect to a THOR node for remote read
*
* @param dp
* the data partition to read
* @param rd
* the JSON definition for the read input and output
* @param pRd
* the projected record definition
* @param connectTimeout
* the connection timeout in milliseconds
* @param limit
* the record limit to use for reading the dataset. -1 implies no limit
* @param createPrefetchThread
* Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally
* @param maxReadSizeInKB
* max readsize in kilobytes
* @param restartInfo
* information used to restart a read from a particular stream position
* @param isFetching
* Will this input stream be used to serviced batched fetch requests
* @param socketOpTimeoutMS
* Socket (read / write) operation timeout in milliseconds
* @param fileName
* fileName being read
* @throws Exception
* general exception
*/
public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, String fileName) throws Exception
public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception
{
this.recordDefinition = rd;
this.fileName =fileName;
this.projectedRecordDefinition = pRd;
this.inFetchingMode = isFetching;

Expand Down Expand Up @@ -403,8 +369,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
this.tokenBin = restartInfo.tokenBin;
this.streamPos = restartInfo.streamPos;
this.streamPosOfFetchStart = this.streamPos;
}
String prefix = "RowServiceInputStream constructor, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
}
String prefix = "RowServiceInputStream constructor, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";

if (inFetchingMode == false)
{
Expand Down Expand Up @@ -768,7 +734,7 @@ private int startFetch()
{
return -1;
}
String prefix = "RowServiceInputStream.startFetch(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";


//------------------------------------------------------------------------------
Expand Down Expand Up @@ -947,7 +913,7 @@ else if (this.handle == 0)

private void readDataInFetch()
{
String prefix = "RowServiceInputStream.readDataInFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
String prefix = "RowServiceInputStream.readDataInFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
if (this.closed.get())
{
return;
Expand Down Expand Up @@ -1027,7 +993,7 @@ private void readDataInFetch()

private void finishFetch()
{
String prefix = "RowServiceInputStream.finishFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
String prefix = "RowServiceInputStream.finishFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
if (this.closed.get())
{
return;
Expand Down Expand Up @@ -1241,7 +1207,7 @@ private void compactBuffer()
@Override
public int available() throws IOException
{
String prefix = "RowServiceInputStream.available(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";

// Do the check for closed first here to avoid data races
if (this.closed.get())
Expand Down Expand Up @@ -1590,7 +1556,7 @@ private void makeActive() throws HpccFileException
{
this.active.set(false);
this.handle = 0;
String prefix = "RowServiceInputStream.makeActive, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";

boolean needsRetry = false;
do
Expand Down Expand Up @@ -2150,7 +2116,7 @@ private String makeCloseHandleRequest()

private void sendCloseFileRequest() throws IOException
{
String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";

if (useOldProtocol)
{
Expand Down Expand Up @@ -2186,7 +2152,7 @@ private void sendCloseFileRequest() throws IOException
private RowServiceResponse readResponse() throws HpccFileException
{
RowServiceResponse response = new RowServiceResponse();
String prefix="RowServiceInputStream.readResponse(): , file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": ";
String prefix="RowServiceInputStream.readResponse(): , file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": ";
try
{
response.len = dis.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,8 +1410,7 @@ public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, bo
try
{
HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS,
file.getFileName());
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS);
fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8);
fileReader.getRecordReader().setStringProcessingFlags(stringProcessingFlags);
fileReaders.add(fileReader);
Expand Down

0 comments on commit 33b1823

Please sign in to comment.