Skip to content

Commit

Permalink
Merge pull request #63 from cancilla/master
Browse files Browse the repository at this point in the history
Merge Streams 4.1 HDFS changes
  • Loading branch information
James Cancilla committed Nov 20, 2015
2 parents 4bf1872 + 145a025 commit 5b19f6c
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -586,6 +588,12 @@ public void initialize(OperatorContext context) throws Exception {

super.initialize(context);

// register for data governance
// only register if static filename mode
TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data Governance - file: " + file + " and HdfsUri: " + getHdfsUri());
if (fileAttrName == null && file != null && getHdfsUri() != null) {
registerForDataGovernance(getHdfsUri(), file);
}

/*
* Set appropriate variables if the optional output port is
Expand Down Expand Up @@ -649,6 +657,23 @@ else if (fileIndex == 0) {
initState = new InitialState();
}

private void registerForDataGovernance(String serverURL, String file) {
TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Registering for data governance with server URL: " + serverURL + " and file: " + file);

Map<String, String> properties = new HashMap<String, String>();
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT);
properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, "HDFS2FileSink");
properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file);
properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE);
properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1");
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL);
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE);
properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT);
TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data governance: " + properties.toString());

setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties);
}

private void createFile(String filename) {

if (TRACE.isLoggable(TraceLevel.DEBUG)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -115,6 +117,13 @@ public synchronized void initialize(OperatorContext context)
}

super.initialize(context);

// register for data governance
TRACE.log(TraceLevel.INFO,
"HDFS2FileSource - Data Governance - file: " + fFileName + " and HdfsUri: " + getHdfsUri());
if (fFileName != null && getHdfsUri() != null) {
registerForDataGovernance(getHdfsUri(), fFileName);
}

// Associate the aspect Log with messages from the SPL log
// logger.
Expand Down Expand Up @@ -150,6 +159,23 @@ public synchronized void initialize(OperatorContext context)
fCrContext = context.getOptionalContext(ConsistentRegionContext.class);
}

private void registerForDataGovernance(String serverURL, String file) {
TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Registering for data governance with server URL: " + serverURL + " and file: " + file);

Map<String, String> properties = new HashMap<String, String>();
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT);
properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, "HDFS2FileSource");
properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file);
properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE);
properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1");
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL);
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE);
properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT);
TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Data governance: " + properties.toString());

setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties);
}

@ContextCheck(compile = true)
public static void validateParameters(OperatorContextChecker checker)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.ibm.streamsx.hdfs;

public interface IGovernanceConstants {
public static final String TAG_OPERATOR_IGC = "OperatorIGC";
public static final String TAG_REGISTER_TYPE = "registerType";
public static final String TAG_REGISTER_TYPE_INPUT = "input";
public static final String TAG_REGISTER_TYPE_OUTPUT = "output";


public static final String ASSET_HDFS_FILE_TYPE = "$Streams-HDFSFile";
public static final String ASSET_HDFS_SERVER_TYPE = "$Streams-HDFSServer";
public static final String ASSET_HDFS_SERVER_TYPE_SHORT = "$HDFSServer";

public static final String PROPERTY_SRC_NAME = "srcName";
public static final String PROPERTY_SRC_TYPE = "srcType";

// public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParentPrefix";
public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent";
public static final String PROPERTY_PARENT_TYPE = "parentType";

public static final String PROPERTY_PARENT_PREFIX = "p1";
public static final String PROPERTY_GRANDPARENT_PREFIX = "p2";

public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType";
public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType";

}
4 changes: 2 additions & 2 deletions com.ibm.streamsx.hdfs/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ Alternatively, you can fully qualify the operators that are provided by toolkit
9. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio.

</description>
<version>2.0.0</version>
<requiredProductVersion>4.0.0.0</requiredProductVersion>
<version>3.0.0</version>
<requiredProductVersion>4.1.0.0</requiredProductVersion>
</identity>
<dependencies/>
</toolkitInfoModel>
2 changes: 1 addition & 1 deletion demos/HDFSBinary/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.hdfs</common:name>
<common:version>[2.0.0,3.0.0)</common:version>
<common:version>[2.0.0,4.0.0)</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
2 changes: 1 addition & 1 deletion demos/HDFSParallelFileSink/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.hdfs</common:name>
<common:version>[2.0.0,3.0.0)</common:version>
<common:version>[2.0.0,4.0.0)</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
2 changes: 1 addition & 1 deletion samples/CompressedHdfsFiles/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.hdfs</common:name>
<common:version>[2.0.0,3.0.0)</common:version>
<common:version>[2.0.0,4.0.0)</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
2 changes: 1 addition & 1 deletion samples/HDFSFileSinkSamples/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.hdfs</common:name>
<common:version>[2.0.0,3.0.0)</common:version>
<common:version>[2.0.0,4.0.0)</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
2 changes: 1 addition & 1 deletion samples/HDFSFileSourceSamples/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.hdfs</common:name>
<common:version>[2.0.0,3.0.0)</common:version>
<common:version>[2.0.0,4.0.0)</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
2 changes: 1 addition & 1 deletion samples/ScanAndRead/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.hdfs</common:name>
<common:version>[2.0.0,3.0.0)</common:version>
<common:version>[2.0.0,4.0.0)</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>

0 comments on commit 5b19f6c

Please sign in to comment.