diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index 534de4b51..75eb5c1e0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -43,6 +43,7 @@ public class DataPartition implements Serializable private String fileAccessBlob; private FileType fileType; private boolean isTLK; + private String fileName; public static enum FileType { @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i * the file type */ private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, - String fileAccessBlob, FileType fileType) + String fileAccessBlob, FileType fileType) { + this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null); + } + /** + * Construct the data part, used by makeParts. + * + * @param copylocations + * locations of all copies of this file part + * @param copyPaths + * the copy paths + * @param this_part + * part number + * @param num_parts + * number of parts + * @param clearport + * port number of clear communications + * @param sslport + * port number of ssl communications + * @param filter + * the file filter object + * @param fileAccessBlob + * file access token + * @param fileType + * the file type + * @param fileName + * the file name + */ + private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, + String fileAccessBlob, FileType fileType, String fileName) { this.this_part = this_part; this.num_parts = num_parts; this.rowservicePort = clearport; this.useSSL = sslport; this.fileFilter = filter; + this.fileName=fileName; if (this.fileFilter == null) { this.fileFilter = new FileFilter(); @@ -348,6 +378,16 @@ public boolean getUseSsl() return useSSL; } + /** + * File name being read + * + * @return filename + */ + public String getFileName() + { + return fileName; + } + /** * Copy Path. * @@ -470,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT); } + + /** + * Creates the partitions. + * + * @param dfuparts + * the dfuparts + * @param clusterremapper + * the clusterremapper + * @param max_parts + * the max parts + * @param filter + * the filter + * @param fileAccessBlob + * the file access blob + * @param fileType + * the file type + * @return the data partition[] + * @throws HpccFileException + * the hpcc file exception + */ + public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, + String fileAccessBlob, FileType fileType) throws HpccFileException { + return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null); + } + /** * Creates the partitions. * @@ -485,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl * the file access blob * @param fileType * the file type + * @param fileName + * the file name * @return the data partition[] * @throws HpccFileException * the hpcc file exception */ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, - String fileAccessBlob, FileType fileType) throws HpccFileException + String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException { DataPartition[] rslt = new DataPartition[dfuparts.length]; @@ -507,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(), dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob, - fileType); + fileType,fileName); new_dp.isTLK = dfuparts[i].isTopLevelKey(); rslt[i] = new_dp; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index 8df2ba73e..98bf4d47d 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException { ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread); this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper, - /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType); + /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName()); // Check to see if this file has a TLK. The TLK will always be the last partition. // If we do have a TLK remove it from the standard list of data partitions. diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index fe7c73268..a1d161fe3 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -193,36 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { - this(dp, originalRD, recBuilder, connectTimeout, limit, createPrefetchThread, readSizeKB, resumeInfo, RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS,null); - } - /** - * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. - * - * @param dp - * the part of the file, name and location - * @param originalRD - * the record defintion for the dataset - * @param recBuilder - * the IRecordBuilder used to construct records - * @param connectTimeout - * the connection timeout in milliseconds, -1 for default - * @param limit - * the maximum number of records to read from the provided data partition, -1 specifies no limit - * @param createPrefetchThread - * the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically. - * @param readSizeKB - * read request size in KB, -1 specifies use default value - * @param resumeInfo - * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start - * @param socketOpTimeoutMs - * Socket (read / write) operation timeout in milliseconds - * @param fileName - * filename to read - * @throws Exception - * general exception - */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs, String fileName) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; @@ -252,7 +223,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, fileName); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -534,7 +505,7 @@ public void close() throws Exception long closeTimeMs = System.currentTimeMillis(); double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; - log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName() + " read time: " + readTimeS + "s " + " records read: " + recordsRead); } @@ -579,7 +550,7 @@ public void report() { if (getRemoteReadMessageCount() > 0) { - log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n"); + log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n"); log.warn(getRemoteReadMessages()); } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 7e74e97cb..2399ef1e6 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -61,7 +61,6 @@ public class RowServiceInputStream extends InputStream implements IProfilable private String projectedJsonRecordDefinition = null; private java.io.DataInputStream dis = null; private java.io.DataOutputStream dos = null; - private String fileName = null; private String rowServiceVersion = ""; private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct @@ -326,42 +325,9 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co * @throws Exception * general exception */ - public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { - this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS,null); - } - - /** - * A plain socket connect to a THOR node for remote read - * - * @param dp - * the data partition to read - * @param rd - * the JSON definition for the read input and output - * @param pRd - * the projected record definition - * @param connectTimeout - * the connection timeout in milliseconds - * @param limit - * the record limit to use for reading the dataset. -1 implies no limit - * @param createPrefetchThread - * Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally - * @param maxReadSizeInKB - * max readsize in kilobytes - * @param restartInfo - * information used to restart a read from a particular stream position - * @param isFetching - * Will this input stream be used to serviced batched fetch requests - * @param socketOpTimeoutMS - * Socket (read / write) operation timeout in milliseconds - * @param fileName - * fileName being read - * @throws Exception - * general exception - */ - public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, String fileName) throws Exception + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { this.recordDefinition = rd; - this.fileName =fileName; this.projectedRecordDefinition = pRd; this.inFetchingMode = isFetching; @@ -403,8 +369,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = restartInfo.tokenBin; this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; - } - String prefix = "RowServiceInputStream constructor, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + } + String prefix = "RowServiceInputStream constructor, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -768,7 +734,7 @@ private int startFetch() { return -1; } - String prefix = "RowServiceInputStream.startFetch(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; //------------------------------------------------------------------------------ @@ -947,7 +913,7 @@ else if (this.handle == 0) private void readDataInFetch() { - String prefix = "RowServiceInputStream.readDataInFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.readDataInFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1027,7 +993,7 @@ private void readDataInFetch() private void finishFetch() { - String prefix = "RowServiceInputStream.finishFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.finishFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1241,7 +1207,7 @@ private void compactBuffer() @Override public int available() throws IOException { - String prefix = "RowServiceInputStream.available(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; // Do the check for closed first here to avoid data races if (this.closed.get()) @@ -1590,7 +1556,7 @@ private void makeActive() throws HpccFileException { this.active.set(false); this.handle = 0; - String prefix = "RowServiceInputStream.makeActive, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; boolean needsRetry = false; do @@ -2150,7 +2116,7 @@ private String makeCloseHandleRequest() private void sendCloseFileRequest() throws IOException { - String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (useOldProtocol) { @@ -2186,7 +2152,7 @@ private void sendCloseFileRequest() throws IOException private RowServiceResponse readResponse() throws HpccFileException { RowServiceResponse response = new RowServiceResponse(); - String prefix="RowServiceInputStream.readResponse(): , file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; + String prefix="RowServiceInputStream.readResponse(): , file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; try { response.len = dis.readInt(); diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index e9db9da6d..7b0634156 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -1410,8 +1410,7 @@ public List readFile(HPCCFile file, Integer connectTimeoutMillis, bo try { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS, - file.getFileName()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS); fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8); fileReader.getRecordReader().setStringProcessingFlags(stringProcessingFlags); fileReaders.add(fileReader);