Skip to content

Commit

Permalink
HPCC4J-542 DFSClient: Create JUnit for read retry
Browse files Browse the repository at this point in the history
- Added file part failure retry test
- Fixed retry issue on initial connection

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Oct 11, 2023
1 parent acc0ecb commit 3c5c750
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,47 @@ public String getCopyIP(int copyindex)
return copyLocations[copyindex];
}

/**
* Set the copy IP
*
* @param copyIndex
* the copyindex
* @param copyIP The IP of the file part copy
*/
public void setCopyIP(int copyIndex, String copyIP)
{
if (copyIndex < 0 || copyIndex >= copyLocations.length) return;

copyLocations[copyIndex] = copyIP;
}

/**
* Add file part copy
*
* @param copyIP The IP of the new file part copy
* @param copyPath The path of the new file part copy
*/
public void addCopyToFront(String copyIP, String copyPath)
{
String[] newCopyLocations = new String[copyLocations.length+1];
for (int i = 0; i < copyLocations.length; i++)
{
newCopyLocations[i+1] = copyLocations[i];
}

newCopyLocations[0] = copyIP;
copyLocations = newCopyLocations;

String[] newCopyPaths = new String[copyPaths.length+1];
for (int i = 0; i < copyPaths.length; i++)
{
newCopyPaths[i+1] = copyPaths[i];
}

newCopyPaths[0] = copyPath;
copyPaths = newCopyPaths;
}

/**
* Count of copies available for this file part.
* @return copy locations size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,150 +1528,156 @@ private void makeActive() throws HpccFileException
this.active.set(false);
this.handle = 0;

try
boolean needsRetry = false;
do
{
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '"
+ getIP() + "'");

needsRetry = false;
try
{
if (getUseSSL())
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '"
+ (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'");
try
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
if (getUseSSL())
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
else
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
}

this.sock.setSoTimeout(socketOpTimeoutMs);

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
else
catch (java.net.UnknownHostException e)
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
throw new HpccFileException("Bad file part addr " + this.getIP(), e);
}
catch (java.io.IOException e)
{
throw new HpccFileException(e);
}

this.sock.setSoTimeout(socketOpTimeoutMs);
try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
{
throw new HpccFileException("Bad file part addr " + this.getIP(), e);
}
catch (java.io.IOException e)
{
throw new HpccFileException(e);
}
//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------

try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}
try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();

//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------
this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}

try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();
RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;

this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}
byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
}

RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;
rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
{
readTrans = makeTokenRequest();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
throw new HpccFileException("Failed on initial remote read read trans", e);
}

rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

try
{
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
if (CompileTimeConstants.PROFILE_CODE)
{
readTrans = makeTokenRequest();
firstByteTimeNS = System.nanoTime();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
this.active.set(true);
}
catch (IOException e)
catch (Exception e)
{
throw new HpccFileException("Failed on initial remote read read trans", e);
}
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (CompileTimeConstants.PROFILE_CODE)
{
firstByteTimeNS = System.nanoTime();
needsRetry = true;
if (!setNextFilePartCopy())
{
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
}

this.active.set(true);
}
catch (Exception e)
{
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (!setNextFilePartCopy())
// This should be a multi exception
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
} while (needsRetry);
}

/* Notes on protocol:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,27 @@ public void emptyCompressedFileTest()
}
}

@Test
public void filePartFailureTest()
{
HPCCFile readFile = null;
try
{
readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass);
DataPartition[] fileParts = readFile.getFileParts();
for (int i = 0; i < fileParts.length; i++) {
fileParts[i].addCopyToFront("1.1.1.1", "");
}

List<HPCCRecord> records = readFile(readFile, null, false);
System.out.println("Record count: " + records.size());
}
catch (Exception e)
{
Assert.fail(e.getMessage());
}
}

@Test
public void invalidSignatureTest()
{
Expand Down

0 comments on commit 3c5c750

Please sign in to comment.