From b38621da6ac9814f2188253bd82be5539330553d Mon Sep 17 00:00:00 2001 From: mkomor Date: Mon, 19 Oct 2015 16:47:18 -0400 Subject: [PATCH] Add support for data governance. On initialize, call the new setTagData API to register the operators for data governance. --- .../com/ibm/streamsx/hdfs/HDFS2FileSink.java | 25 +++++++++++++++++ .../ibm/streamsx/hdfs/HDFS2FileSource.java | 26 ++++++++++++++++++ .../streamsx/hdfs/IGovernanceConstants.java | 27 +++++++++++++++++++ 3 files changed, 78 insertions(+) create mode 100644 com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java index 560709d..01b9cdf 100644 --- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java @@ -13,7 +13,9 @@ import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Calendar; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -586,6 +588,12 @@ public void initialize(OperatorContext context) throws Exception { super.initialize(context); + // register for data governance + // only register if static filename mode + TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data Governance - file: " + file + " and HdfsUri: " + getHdfsUri()); + if (fileAttrName == null && file != null && getHdfsUri() != null) { + registerForDataGovernance(getHdfsUri(), file); + } /* * Set appropriate variables if the optional output port is @@ -649,6 +657,23 @@ else if (fileIndex == 0) { initState = new InitialState(); } + private void registerForDataGovernance(String serverURL, String file) { + TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Registering for data governance with server URL: " + serverURL + " and file: " + file); + + Map properties = new HashMap(); + properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT); + properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, "HDFS2FileSink"); + properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file); + properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE); + properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1"); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE); + properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT); + TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data governance: " + properties.toString()); + + setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties); + } + private void createFile(String filename) { if (TRACE.isLoggable(TraceLevel.DEBUG)) { diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java index 1eda61c..0033f4d 100644 --- a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSource.java @@ -12,7 +12,9 @@ import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.logging.Logger; import org.apache.hadoop.fs.FSDataInputStream; @@ -115,6 +117,13 @@ public synchronized void initialize(OperatorContext context) } super.initialize(context); + + // register for data governance + TRACE.log(TraceLevel.INFO, + "HDFS2FileSource - Data Governance - file: " + fFileName + " and HdfsUri: " + getHdfsUri()); + if (fFileName != null && getHdfsUri() != null) { + registerForDataGovernance(getHdfsUri(), fFileName); + } // Associate the aspect Log with messages from the SPL log // logger. @@ -150,6 +159,23 @@ public synchronized void initialize(OperatorContext context) fCrContext = context.getOptionalContext(ConsistentRegionContext.class); } + private void registerForDataGovernance(String serverURL, String file) { + TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Registering for data governance with server URL: " + serverURL + " and file: " + file); + + Map properties = new HashMap(); + properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT); + properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, "HDFS2FileSource"); + properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file); + properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE); + properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1"); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL); + properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE); + properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT); + TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Data governance: " + properties.toString()); + + setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties); + } + @ContextCheck(compile = true) public static void validateParameters(OperatorContextChecker checker) throws Exception { diff --git a/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java new file mode 100644 index 0000000..b0723ea --- /dev/null +++ b/com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/IGovernanceConstants.java @@ -0,0 +1,27 @@ +package com.ibm.streamsx.hdfs; + +public interface IGovernanceConstants { + public static final String TAG_OPERATOR_IGC = "OperatorIGC"; + public static final String TAG_REGISTER_TYPE = "registerType"; + public static final String TAG_REGISTER_TYPE_INPUT = "input"; + public static final String TAG_REGISTER_TYPE_OUTPUT = "output"; + + + public static final String ASSET_HDFS_FILE_TYPE = "$Streams-HDFSFile"; + public static final String ASSET_HDFS_SERVER_TYPE = "$Streams-HDFSServer"; + public static final String ASSET_HDFS_SERVER_TYPE_SHORT = "$HDFSServer"; + + public static final String PROPERTY_SRC_NAME = "srcName"; + public static final String PROPERTY_SRC_TYPE = "srcType"; + +// public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParentPrefix"; + public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent"; + public static final String PROPERTY_PARENT_TYPE = "parentType"; + + public static final String PROPERTY_PARENT_PREFIX = "p1"; + public static final String PROPERTY_GRANDPARENT_PREFIX = "p2"; + + public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType"; + public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType"; + +}