Skip to content

Commit

Permalink
Add support for data governance. On initialize, call the new setTagData
Browse files Browse the repository at this point in the history
API to register the operators for data governance.
  • Loading branch information
mkomor committed Oct 19, 2015
1 parent 4bf1872 commit b38621d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -586,6 +588,12 @@ public void initialize(OperatorContext context) throws Exception {

super.initialize(context);

// register for data governance
// only register if static filename mode
TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data Governance - file: " + file + " and HdfsUri: " + getHdfsUri());
if (fileAttrName == null && file != null && getHdfsUri() != null) {
registerForDataGovernance(getHdfsUri(), file);
}

/*
* Set appropriate variables if the optional output port is
Expand Down Expand Up @@ -649,6 +657,23 @@ else if (fileIndex == 0) {
initState = new InitialState();
}

private void registerForDataGovernance(String serverURL, String file) {
TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Registering for data governance with server URL: " + serverURL + " and file: " + file);

Map<String, String> properties = new HashMap<String, String>();
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT);
properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, "HDFS2FileSink");
properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file);
properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE);
properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1");
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL);
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE);
properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT);
TRACE.log(TraceLevel.INFO, "HDFS2FileSink - Data governance: " + properties.toString());

setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties);
}

private void createFile(String filename) {

if (TRACE.isLoggable(TraceLevel.DEBUG)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -115,6 +117,13 @@ public synchronized void initialize(OperatorContext context)
}

super.initialize(context);

// register for data governance
TRACE.log(TraceLevel.INFO,
"HDFS2FileSource - Data Governance - file: " + fFileName + " and HdfsUri: " + getHdfsUri());
if (fFileName != null && getHdfsUri() != null) {
registerForDataGovernance(getHdfsUri(), fFileName);
}

// Associate the aspect Log with messages from the SPL log
// logger.
Expand Down Expand Up @@ -150,6 +159,23 @@ public synchronized void initialize(OperatorContext context)
fCrContext = context.getOptionalContext(ConsistentRegionContext.class);
}

private void registerForDataGovernance(String serverURL, String file) {
TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Registering for data governance with server URL: " + serverURL + " and file: " + file);

Map<String, String> properties = new HashMap<String, String>();
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT);
properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, "HDFS2FileSource");
properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, file);
properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_FILE_TYPE);
properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, "p1");
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_NAME, serverURL);
properties.put("p1" + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE);
properties.put("p1" + IGovernanceConstants.PROPERTY_PARENT_TYPE, IGovernanceConstants.ASSET_HDFS_SERVER_TYPE_SHORT);
TRACE.log(TraceLevel.INFO, "HDFS2FileSource - Data governance: " + properties.toString());

setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties);
}

@ContextCheck(compile = true)
public static void validateParameters(OperatorContextChecker checker)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.ibm.streamsx.hdfs;

public interface IGovernanceConstants {
public static final String TAG_OPERATOR_IGC = "OperatorIGC";
public static final String TAG_REGISTER_TYPE = "registerType";
public static final String TAG_REGISTER_TYPE_INPUT = "input";
public static final String TAG_REGISTER_TYPE_OUTPUT = "output";


public static final String ASSET_HDFS_FILE_TYPE = "$Streams-HDFSFile";
public static final String ASSET_HDFS_SERVER_TYPE = "$Streams-HDFSServer";
public static final String ASSET_HDFS_SERVER_TYPE_SHORT = "$HDFSServer";

public static final String PROPERTY_SRC_NAME = "srcName";
public static final String PROPERTY_SRC_TYPE = "srcType";

// public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParentPrefix";
public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent";
public static final String PROPERTY_PARENT_TYPE = "parentType";

public static final String PROPERTY_PARENT_PREFIX = "p1";
public static final String PROPERTY_GRANDPARENT_PREFIX = "p2";

public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType";
public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType";

}

0 comments on commit b38621d

Please sign in to comment.