diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index bc648b49c..042b9b5c3 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -266,6 +266,47 @@ public String getCopyIP(int copyindex) return copyLocations[copyindex]; } + /** + * Set the copy IP + * + * @param copyIndex + * the copyindex + * @param copyIP The IP of the file part copy + */ + public void setCopyIP(int copyIndex, String copyIP) + { + if (copyIndex < 0 || copyIndex >= copyLocations.length) return; + + copyLocations[copyIndex] = copyIP; + } + + /** + * Add file part copy + * + * @param copyIP The IP of the new file part copy + * @param copyPath The path of the new file part copy + */ + public void addCopyToFront(String copyIP, String copyPath) + { + String[] newCopyLocations = new String[copyLocations.length+1]; + for (int i = 0; i < copyLocations.length; i++) + { + newCopyLocations[i+1] = copyLocations[i]; + } + + newCopyLocations[0] = copyIP; + copyLocations = newCopyLocations; + + String[] newCopyPaths = new String[copyPaths.length+1]; + for (int i = 0; i < copyPaths.length; i++) + { + newCopyPaths[i+1] = copyPaths[i]; + } + + newCopyPaths[0] = copyPath; + copyPaths = newCopyPaths; + } + /** * Count of copies available for this file part. * @return copy locations size diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 795fd8478..d8218b767 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -1528,150 +1528,156 @@ private void makeActive() throws HpccFileException this.active.set(false); this.handle = 0; - try + boolean needsRetry = false; + do { - log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '" - + getIP() + "'"); - + needsRetry = false; try { - if (getUseSSL()) + log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'"); + try { - SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); - sock = (SSLSocket) ssf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); - - log.debug("Attempting SSL handshake..."); - ((SSLSocket) sock).startHandshake(); - log.debug("SSL handshake successful..."); - log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + if (getUseSSL()) + { + SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); + sock = (SSLSocket) ssf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + + log.debug("Attempting SSL handshake..."); + ((SSLSocket) sock).startHandshake(); + log.debug("SSL handshake successful..."); + log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + } + else + { + SocketFactory sf = SocketFactory.getDefault(); + sock = sf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + } + + this.sock.setSoTimeout(socketOpTimeoutMs); + + log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); } - else + catch (java.net.UnknownHostException e) { - SocketFactory sf = SocketFactory.getDefault(); - sock = sf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + throw new HpccFileException("Bad file part addr " + this.getIP(), e); + } + catch (java.io.IOException e) + { + throw new HpccFileException(e); } - this.sock.setSoTimeout(socketOpTimeoutMs); + try + { + this.dos = new java.io.DataOutputStream(sock.getOutputStream()); + this.dis = new java.io.DataInputStream(sock.getInputStream()); + } + catch (java.io.IOException e) + { + throw new HpccFileException("Failed to create streams", e); + } - log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); - } - catch (java.net.UnknownHostException e) - { - throw new HpccFileException("Bad file part addr " + this.getIP(), e); - } - catch (java.io.IOException e) - { - throw new HpccFileException(e); - } + //------------------------------------------------------------------------------ + // Check protocol version + //------------------------------------------------------------------------------ - try - { - this.dos = new java.io.DataOutputStream(sock.getOutputStream()); - this.dis = new java.io.DataInputStream(sock.getInputStream()); - } - catch (java.io.IOException e) - { - throw new HpccFileException("Failed to create streams", e); - } + try + { + String msg = makeGetVersionRequest(); + int msgLen = msg.length(); - //------------------------------------------------------------------------------ - // Check protocol version - //------------------------------------------------------------------------------ + this.dos.writeInt(msgLen); + this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); + this.dos.flush(); + } + catch (IOException e) + { + throw new HpccFileException("Failed on initial remote read trans", e); + } - try - { - String msg = makeGetVersionRequest(); - int msgLen = msg.length(); + RowServiceResponse response = readResponse(); + if (response.len == 0) + { + useOldProtocol = true; + } + else + { + useOldProtocol = false; - this.dos.writeInt(msgLen); - this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); - this.dos.flush(); - } - catch (IOException e) - { - throw new HpccFileException("Failed on initial remote read trans", e); - } + byte[] versionBytes = new byte[response.len]; + try + { + this.dis.readFully(versionBytes); + } + catch (IOException e) + { + throw new HpccFileException("Error while attempting to read version response.", e); + } - RowServiceResponse response = readResponse(); - if (response.len == 0) - { - useOldProtocol = true; - } - else - { - useOldProtocol = false; + rowServiceVersion = new String(versionBytes, HPCCCharSet); + } + + //------------------------------------------------------------------------------ + // Send initial read request + //------------------------------------------------------------------------------ - byte[] versionBytes = new byte[response.len]; try { - this.dis.readFully(versionBytes); + String readTrans = null; + if (this.tokenBin == null) + { + this.tokenBin = new byte[0]; + readTrans = makeInitialRequest(); + } + else + { + readTrans = makeTokenRequest(); + } + + int transLen = readTrans.length(); + this.dos.writeInt(transLen); + this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); + this.dos.flush(); } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException("Failed on initial remote read read trans", e); } - rowServiceVersion = new String(versionBytes, HPCCCharSet); - } - - //------------------------------------------------------------------------------ - // Send initial read request - //------------------------------------------------------------------------------ - - try - { - String readTrans = null; - if (this.tokenBin == null) - { - this.tokenBin = new byte[0]; - readTrans = makeInitialRequest(); - } - else + if (CompileTimeConstants.PROFILE_CODE) { - readTrans = makeTokenRequest(); + firstByteTimeNS = System.nanoTime(); } - int transLen = readTrans.length(); - this.dos.writeInt(transLen); - this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); - this.dos.flush(); + this.active.set(true); } - catch (IOException e) + catch (Exception e) { - throw new HpccFileException("Failed on initial remote read read trans", e); - } + log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + + "'"); + log.error(e.getMessage()); - if (CompileTimeConstants.PROFILE_CODE) - { - firstByteTimeNS = System.nanoTime(); + needsRetry = true; + if (!setNextFilePartCopy()) + { + throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + } } - - this.active.set(true); - } - catch (Exception e) - { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); - - if (!setNextFilePartCopy()) - // This should be a multi exception - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); - } + } while (needsRetry); } /* Notes on protocol: diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 2fc59c86d..988543202 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -1039,6 +1039,27 @@ public void emptyCompressedFileTest() } } + @Test + public void filePartFailureTest() + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) { + fileParts[i].addCopyToFront("1.1.1.1", ""); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + @Test public void invalidSignatureTest() {