Skip to content

How to access webhdfs via knox

Ahmad Nouri edited this page Sep 19, 2019 · 13 revisions

How to access webhdfs via knox

The Apache Knox Gateway is a system that provides a single point of authentication and access for Apache Hadoop services in a cluster. https://knox.apache.org/books/knox-1-4-0/user-guide.html

The streamsx.hdfs toolkit uses the REST API interface of webHDFS to access the HDFS file sytem.
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html

KNOX Configuration

Before we start with our SPL demo application, we have to configure and start the knox service.

Add new LDAP user

cd {GATEWAY_HOME}
cd /usr/hdp/current/knox-server/
# make a backup from LDAP user file.
cp ./conf/users.ldif ./conf/users.ldif1

Edit the LDAP user file:
Due to the security issues, remove all already available demo users like guest, sam, tom from the users.ldif.
And add your own user and password into users.ldif file.

Here is an LDIF example for user hdfs.

# Please replace with site specific values
dn: dc=hadoop,dc=apache,dc=org
objectclass: organization
objectclass: dcObject
o: Hadoop
dc: hadoop

# Entry for a sample people container
# Please replace with site specific values
dn: ou=people,dc=hadoop,dc=apache,dc=org
objectclass:top
objectclass:organizationalUnit
ou: people

dn: uid=hdfs,ou=people,dc=hadoop,dc=apache,dc=org
objectclass:top
objectclass:person
objectclass:organizationalPerson
objectclass:inetOrgPerson
cn: Hdfs
sn: Hdfs
uid: hdfs
userPassword:hdfs-password

Restart LDAP

cd /usr/hdp/current/knox-server/
bin/ldap.sh stop
Stopping LDAP with PID 3705 succeeded.
bin/ldap.sh start
Starting LDAP succeeded with PID 28870.
bin/ldap.sh status
LDAP is running with PID 28870.

It is also possible to start/stop Demo LDAP from the Ambari web interface.

Start the Knox service, and then start the Demo LDAP

Test the knox access via curl from your client server.

curl -k -u hdfs:hdfs-password https://hdfs1.fyre.ibm.com:8443/gateway/default/webhdfs/v1/user/?op=LISTSTATUS

Login on your hadoop server and create a test directory

su - hdfs
hadoop fs -mkdir  /user/hdfs
hadoop fs -mkdir  /user/hdfs/out

SPL sample

The streamsx.hdfs toolkit supports from version 5.0.0 a new parameter credentials.
This optional parameter specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
This parameter can also be specified in an application configuration.
https://github.com/IBMStreams/streamsx.hdfs/releases/tag/v5.0.0

The JSON string must to have the following format:

{
    "user"     : "user",
    "password" : "hdfs-password",
    "webhdfs"  : "webhdfs://ip-address:8443"
}

It is also possible to access to IAE (IBM Analytic Engine) via credentials.

The following SPL sample uses credentials to access webHDFS:

/*******************************************************************************
* Copyright (C) 2019, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/
namespace application ;

use com.ibm.streamsx.hdfs::* ;

/**
 * The webHDFS sample demonstrates how to access webhdfs via knox useer and password.
 * A Beacon operator generates some test line 
 * HdfsFileSink writes every 10 lines in a new file in /user/hdfs/out directory
 * HdfsDirScanOut scans the given directory (out) from HDFS, which is the user's home directory and returns the file names
 * HdfsFileSource reads files and returns lines. It uses the file name from directory scan to read the file
 * CopyFromHdfsToLocal copies all incoming files (/user/hdfs/out/output-xx.txt) from input port into local directory data.
 * Prints operators are Custom and prints the output of HDFS operators
 */
composite webHDFS
{
  param
    expression<rstring> $credentials : getSubmissionTimeValue("credentials", "{
            \"user\": \"hdfs\",
            \"webhdfs\": \"webhdfs://<your-haddop-server>:8443\",
            \"password\": \"hdfs-password\"
        }") ;
  graph


    // generates lines
    stream<rstring line> CreateLines = Beacon()
    {
        param
            initDelay : 1.0 ;
            iterations : 100u ;
        output
            CreateLines : line = (rstring)IterationCount() + ": This line will be written into a HDFS file." ;
    }

    // HdfsFileSink writes every 10 lines from CreateLines in a new file in /user/hdfs/out directory
    stream<rstring fileName, uint64 size> HdfsFileSink = HDFS2FileSink(CreateLines)
    {
      param
        credentials : $credentials ;
        file : "out/output-%FILENUM.txt" ;
        tuplesPerFile : 10l ;
    }

    //print out the file names and the size of file
    () as PrintHdfsFileSink = Custom(HdfsFileSink)
    {
      logic
        onTuple HdfsFileSink :
        {
          printStringLn("HdfsFileSink fileName , size : " +(rstring) HdfsFileSink) ;
        }

    }

      // HdfsDirScanOut scans the given directory from HDFS, default to . which is the user's home directory
    stream<rstring hdfsFile> HdfsDirScanOut = HDFS2DirectoryScan()
    {
      param
        initDelay : 10.0 ;
        directory : "out" ;
        credentials : $credentials ;
        strictMode : false ;
    }


    //print out the names of each file found in the directory
    () as PrintHdfsDirScanOut = Custom(HdfsDirScanOut)
    {
      logic
        onTuple HdfsDirScanOut :
        {
          printStringLn("HdfsDirScanOut fileName  : " +(rstring) HdfsDirScanOut) ;
        }

    }

    // HdfsFileSource reads files and returns lines into output port
    // It uses the file name from directory scan to read the file
    stream<rstring lines> HdfsFileSource = HDFS2FileSource(HdfsDirScanOut)
    {
      param
        credentials : $credentials ;
    }

    //print out the names of each file found in the directory
    () as PrintHdfsFileSource = Custom(HdfsFileSource)
    {
      logic
        onTuple HdfsFileSource :
        {
          printStringLn("HdfsFileSource   line : " + lines) ;
        }

    }

    // copies all incoming files from input port /user/hdfs/out/outputx.txt into local data directory.
    stream<rstring message, uint64 elapsedTime> CopyFromHdfsToLocal = HDFS2FileCopy(HdfsDirScanOut)
    {
        param
            hdfsFileAttrName : "hdfsFile" ;
            localFile : "./" ;
                deleteSourceFile : false ;
                overwriteDestinationFile : true ;
                direction : copyToLocalFile ;
                credentials : $credentials ;
  
    }

    //print out the message and the elapsed time  
    () as PrintCopyFromHdfsToLocal = Custom(CopyFromHdfsToLocal)
    {
      logic
        onTuple CopyFromHdfsToLocal :
        {
          printStringLn("CopyFromHdfsToLocal message,  elapsedTime : " +(rstring) CopyFromHdfsToLocal) ;
        }

    }

}