diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java index 560709d..01b9cdf 100644 --- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java @@ -13,7 +13,9 @@ import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Calendar; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -586,6 +588,12 @@ public void initialize(OperatorContext context) throws Exception { super.initialize(context); + // register for data governance + // only register if static filename mode + TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data Governance - file: " + file + " and HdfsUri: " + getHdfsUri()); + if (fileAttrName == null && file != null && getHdfsUri() != null) { + registerForDataGovernance(getHdfsUri(), file); + } /* * Set appropriate variables if the optional output port is @@ -649,6 +657,23 @@ else if (fileIndex == 0) { initState = new InitialState(); } + private void registerForDataGovernance(String serverURL, String file) { + TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Registering for data governance with server URL: " + serverURL + " and file: " + file); + + Map properties = new HashMap(); + properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT); + properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, "HDFS2FileSink"); + properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file); + properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE); + properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1"); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE); + properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT); + TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data governance: " + properties.toString()); + + setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties); + } + private void createFile(String filename) { if (TRACE.isLoggable(TraceLevel.DEBUG)) { diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java index 1eda61c..0033f4d 100644 --- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java @@ -12,7 +12,9 @@ import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.logging.Logger; import org.apache.hadoop.fs.FSDataInputStream; @@ -115,6 +117,13 @@ public synchronized void initialize(OperatorContext context) } super.initialize(context); + + // register for data governance + TRACE.log(TraceLevel.INFO, + "HDFS2FileSource - Data Governance - file: " + fFileName + " and HdfsUri: " + getHdfsUri()); + if (fFileName != null && getHdfsUri() != null) { + registerForDataGovernance(getHdfsUri(), fFileName); + } // Associate the aspect Log with messages from the SPL log // logger. @@ -150,6 +159,23 @@ public synchronized void initialize(OperatorContext context) fCrContext = context.getOptionalContext(ConsistentRegionContext.class); } + private void registerForDataGovernance(String serverURL, String file) { + TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Registering for data governance with server URL: " + serverURL + " and file: " + file); + + Map properties = new HashMap(); + properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT); + properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, "HDFS2FileSource"); + properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file); + properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE); + properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1"); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE); + properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT); + TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Data governance: " + properties.toString()); + + setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties); + } + @ContextCheck(compile = true) public static void validateParameters(OperatorContextChecker checker) throws Exception { diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java new file mode 100644 index 0000000..b0723ea --- /dev/null +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java @@ -0,0 +1,27 @@ +package com.ibm.streamsx.hdfs; + +public interface IGovernanceConstants { + public static final String TAG_OPERATOR_IGC = "OperatorIGC"; + public static final String TAG_REGISTER_TYPE = "registerType"; + public static final String TAG_REGISTER_TYPE_INPUT = "input"; + public static final String TAG_REGISTER_TYPE_OUTPUT = "output"; + + + public static final String ASSET_HDFS_FILE_TYPE = "$Streams-HDFSFile"; + public static final String ASSET_HDFS_SERVER_TYPE = "$Streams-HDFSServer"; + public static final String ASSET_HDFS_SERVER_TYPE_SHORT = "$HDFSServer"; + + public static final String PROPERTY_SRC_NAME = "srcName"; + public static final String PROPERTY_SRC_TYPE = "srcType"; + +// public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParentPrefix"; + public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent"; + public static final String PROPERTY_PARENT_TYPE = "parentType"; + + public static final String PROPERTY_PARENT_PREFIX = "p1"; + public static final String PROPERTY_GRANDPARENT_PREFIX = "p2"; + + public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType"; + public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType"; + +} diff --git a/com.ibm.streamsx.hdfs/info.xml b/com.ibm.streamsx.hdfs/info.xml index 54d56f4..7f98683 100644 --- a/com.ibm.streamsx.hdfs/info.xml +++ b/com.ibm.streamsx.hdfs/info.xml @@ -156,8 +156,8 @@ Alternatively, you can fully qualify the operators that are provided by toolkit 9. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio. - 2.0.0 - 4.0.0.0 + 3.0.0 + 4.1.0.0 \ No newline at end of file diff --git a/demos/HDFSBinary/info.xml b/demos/HDFSBinary/info.xml index c15dfaa..8e18d8f 100644 --- a/demos/HDFSBinary/info.xml +++ b/demos/HDFSBinary/info.xml @@ -9,7 +9,7 @@ com.ibm.streamsx.hdfs - [2.0.0,3.0.0) + [2.0.0,4.0.0) \ No newline at end of file diff --git a/demos/HDFSParallelFileSink/info.xml b/demos/HDFSParallelFileSink/info.xml index 61b8bf9..b9987b6 100644 --- a/demos/HDFSParallelFileSink/info.xml +++ b/demos/HDFSParallelFileSink/info.xml @@ -15,7 +15,7 @@ com.ibm.streamsx.hdfs - [2.0.0,3.0.0) + [2.0.0,4.0.0) diff --git a/samples/CompressedHdfsFiles/info.xml b/samples/CompressedHdfsFiles/info.xml index 766b095..e9f53be 100644 --- a/samples/CompressedHdfsFiles/info.xml +++ b/samples/CompressedHdfsFiles/info.xml @@ -9,7 +9,7 @@ com.ibm.streamsx.hdfs - [2.0.0,3.0.0) + [2.0.0,4.0.0) \ No newline at end of file diff --git a/samples/HDFSFileSinkSamples/info.xml b/samples/HDFSFileSinkSamples/info.xml index f808d1a..2e37448 100644 --- a/samples/HDFSFileSinkSamples/info.xml +++ b/samples/HDFSFileSinkSamples/info.xml @@ -15,7 +15,7 @@ com.ibm.streamsx.hdfs - [2.0.0,3.0.0) + [2.0.0,4.0.0) \ No newline at end of file diff --git a/samples/HDFSFileSourceSamples/info.xml b/samples/HDFSFileSourceSamples/info.xml index 9eca677..b334da2 100644 --- a/samples/HDFSFileSourceSamples/info.xml +++ b/samples/HDFSFileSourceSamples/info.xml @@ -15,7 +15,7 @@ com.ibm.streamsx.hdfs - [2.0.0,3.0.0) + [2.0.0,4.0.0) \ No newline at end of file diff --git a/samples/ScanAndRead/info.xml b/samples/ScanAndRead/info.xml index 3a34638..4927132 100644 --- a/samples/ScanAndRead/info.xml +++ b/samples/ScanAndRead/info.xml @@ -15,7 +15,7 @@ com.ibm.streamsx.hdfs - [2.0.0,3.0.0) + [2.0.0,4.0.0) \ No newline at end of file