Skip to content

How to read files via FTP and write them into HDFS filesystem.

Ahmad Nouri edited this page Dec 18, 2018 · 3 revisions

How to read files via FTP and write them into HDFS filesystem.

The following SPL application demonstrates how to connect to a FTP server and read files from a directory. And how to insert these files into a HDFS file system via HDFS2FileSink.

/*
 * 'SignalOutput' creates a connection to the FTP server and reads the file names from a directory.
 * 'FileContents' reads the file contents and forwards the lines to the HdfsWriter.
 * Check if the FTP server is running.
 * service vsftpd status
 * 'HdfsWriter' writes all incoming lines into a file in your HDFS server in your user directory.
 * Make sure that directory defined in "HDFSPath" parameter exist and the user has permeation to write in this directory.
 * After a successfully copy of file in HDFS, 'DeleteFtpFile' deletes the file with FTP command
 * 
 * When your hadoop cluster is kerberosed, you can add $authKeytab an $authPrincipal parameter 
 * to the HDFS2FileSink.
 * Copy 'core-site.xml' file and authentication keytab from hadoop server in etc directory of your project.
 * Before you begin with submit your SPL application make sure that your keytab and principal works
 * for example: 
 * kinit -k -t etc/hdfs.headless.keytab [email protected]
 *  
 * The print streams are only to help for a better understanding of the whole streams.
 * They can be removed for production application.
 * 
 * Adapt the first parameters in this SPL file.
 * Replace only the values of these parameters with your FTP and HDFS credentials.
 */

namespace application ;

use com.ibm.streamsx.inet.ftp::* ;
use com.ibm.streamsx.hdfs::HDFS2FileSink ;

composite SFTP2HDFS
{

param
	expression<Protocol> $protocol : (Protocol)getSubmissionTimeValue("protocol", "sftp");
	expression<rstring> $host : getSubmissionTimeValue("host", "myftphost.fyre.ibm.com");
	expression<rstring> $path : getSubmissionTimeValue("path", "/Inbox/");
	expression<rstring> $username : getSubmissionTimeValue("username", "ftpuser");
	expression<rstring> $password : getSubmissionTimeValue("password", "myftppassword");
	expression<rstring> $HDFSPath : getSubmissionTimeValue("HDFSPath", "/user/hdfs/testDirectory/");
	expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hdfs.headless.keytab") ;
	expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "[email protected]") ;
	expression<rstring> $configPath : getSubmissionTimeValue("configPath", "etc") ;

	graph
		// a trigger stream for the ftp directory scan
		stream<int32 count> TriggerStream = Beacon()
		{
			param
				initDelay : 2.0 ;
				iterations : 1 ;
				period : 10.0 ;
			output
				TriggerStream : count =(int32) IterationCount() ;
			config
				placement : partitionColocation("DIR") ;
		}

		stream<rstring line, rstring fileName, uint64 size, rstring date, rstring user, boolean isFile,
			uint32 transferCount, uint32 failureCount, uint64 bytesTransferred, float64 speed> SignalOutput
			as OUT = FTPReader(TriggerStream)
		{
			param
				protocol : $protocol  ;
				isDirReader : true ;
				host : $host ;
				path : $path  ;
				username : $username  ;
				password : $password  ;
			output
				OUT : line = Line(), fileName = FileName(), size = FileSize(), date = FileDate(), user =
					FileUser(), isFile = IsFile(), transferCount = TransferCount(), failureCount =
					TransferFailureCount(), bytesTransferred = BytesTransferred(), speed = TransferSpeed() ;
		}

		stream<rstring line, rstring fileName> FileContents as OUT1 = FTPReader(SignalOutput as IN)
		{
			param
				protocol : $protocol  ;
				isDirReader : false ;
				filename : IN.fileName ;
				host : $host ;
				path : $path  ;
				username : $username  ;
				password : $password  ;
			output
				OUT1 : line = Line(), fileName = $HDFSPath + IN.fileName ;
		}
		
		() as printFTPReader=Custom(FileContents){
			logic
				onTuple FileContents :
				{
					printStringLn("HdfsWriter line : " + line + " fileName : " + fileName) ;
			}
		
		}
  
		stream<rstring out_file_name, uint64 size> HdfsWriter = HDFS2FileSink(FileContents)
		{
			logic
				onTuple FileContents :
				{
					printStringLn("HdfsWriter line : " + line + " fileName : " + fileName) ;
				}

			param
				authKeytab : $authKeytab ;
				authPrincipal : $authPrincipal ;
				configPath : "etc" ;
				fileAttributeName : "fileName" ;
//				vmArg : "-Djava.security.krb5.conf=/path/krb5.conf" ;
		}

		//prepare command stream
		stream<rstring command, rstring file> CommandStream as OUT = Custom(HdfsWriter as IN)
		{
			logic
				state : 
				{
					mutable rstring fileName = "" ;
					mutable int32 position1 = 0 ;
					mutable int32 position2 = 0 ;
					mutable int32 leng = 0 ;
				}
				onTuple IN :
				{
					printStringLn("file name " + IN.out_file_name) ;
					fileName =  IN.out_file_name ;
					// extract only file name without path
					leng = length(fileName) ;
					position1 = findLast(fileName, "/", leng - 1) ;
					fileName = substring(fileName, position1 + 1, leng - 1) ;
					
					submit({ command = "rm", file = fileName }, OUT) ;
				}

			config
				placement : partitionColocation("DELETE") ;
		}

		() as printCommandStream=Custom(CommandStream){
			logic
				onTuple CommandStream :
				{
					printStringLn("printCommandStream line : " + command + " fileName : " + file) ;
			}
		
		}
 	
		stream<boolean success, rstring fileName> DeleteFtpFile as OUT = FTPCommand(CommandStream as IN)
		{
				param
				protocol : $protocol  ;
				filename : IN.file ;
				command : IN.command ;
				host : $host ;
				path : $path  ;
				username : $username  ;
				password : $password  ;
				connectionCloseMode : never ;
				curlVerbose : false;
				
			output
				OUT : fileName = IN.file, success = Success() ;
			config
				placement : partitionColocation("DELETE") ;
		}
		
		() as printDeleteFtpFile=Custom(DeleteFtpFile){
			logic
				onTuple DeleteFtpFile :
				{
					printStringLn("DeleteFtpFile line : " + (rstring)success + " fileName : " + fileName) ;
			}
		
		}

}