Code to demonstrate, how to write basic Airflow to achieve incremental import from Mysql to Hive. #Problem Statement :- Mysql has table called 'employee_profile' having employee information having first name, last name and SSN. Script should check for new and modified records in table and update corresponding hive table with modified recrods, and also have additional table in hive with masked social security number (SSN). Use Sqoop to achieve incremental import and use Airflow for automate the process.
CREATE TABLE `employee_profile` (
`profile_id` VARCHAR(255) NOT NULL,
`first_name` VARCHAR(45) NULL,
`last_name` VARCHAR(45) NULL,
`modified_date` DATETIME NULL,
`ssn` VARCHAR(45) NULL,
PRIMARY KEY (`profile_id`) );
###ii) Insert base data in mysql employee_profile
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('1', 'FristName1', 'LastName1', '2015-12-01 11:24:35','111-11-1111');
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('2', 'FristName2', 'LastName2', '2015-12-02 11:24:35','222-22-2222');
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('3', 'FristName3', 'LastName3', '2015-12-03 11:24:35','333-33-3333');
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('4', 'FristName4', 'LastName4', '2015-12-04 11:24:35','444-44-4444');
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('5', 'FristName5', 'LastName5', '2015-12-05 11:24:35','555-55-5555');
sqoop import --connect jdbc:mysql://localhost/employee --username root --password cloudera --table employee_profile --m 1 --target-dir /user/cloudera/base/
a)CREATE EXTERNAL TABLE employee_profile (profile_id string, first_name string,last_name string,modified_date string, ssn string) row format delimited fields terminated by ',' stored as textfile LOCATION '/user/cloudera/tabledata/'
b)LOAD DATA INPATH '/user/cloudera/base/' INTO TABLE employee_profile;
c)CREATE EXTERNAL TABLE employee_profile_masked (profile_id string, first_name string,last_name string,modified_date string, ssn string) row format delimited fields terminated by ',' stored as textfile LOCATION '/user/cloudera/tabledatamask/'
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('6', 'FristName6', 'LastName6', '2015-12-11 11:25:35','666-66-6666');
INSERT INTO `employee_profile` (`profile_id`, `first_name`, `last_name`, `modified_date`,`ssn`) VALUES ('7', 'FristName7', 'LastName7', '2016-12-12 11:24:35','111-22-1111');
UPDATE employee_profile set last_name = 'LastName2_new',modified_date = '2015-12-13 11:24:35' where profile_id = 2;
UPDATE employee_profile set first_name = 'FristName3_new',modified_date = '2015-12-14 11:24:35' where profile_id = 3;
Airflow runs on python 2.7 and pip2.7
CentOS Install
SSH onto the target machine
sudo yum groupinstall "Development tools"
sudo yum install zlib-devel
sudo yum install bzip2-devel
sudo yum install openssl-devel
sudo yum install ncurses-devel
sudo yum install sqlite-devel
sudo yum install python-devel
Note:- If Python installed is lower than Python 2.7.x then follow these steps 3 and 4.
cd /opt
sudo wget --no-check-certificate https://www.python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz
tar xf Python-2.7.6.tar.xz
cd Python-2.7.6
./configure --prefix=/usr/local
make && make altinstall
ls -ltr /usr/local/bin/python*
$vi ~/.bashrc
add this line alias python='/usr/local/bin/python2.7'
sudo wget https://bootstrap.pypa.io/ez_setup.py
sudo /usr/local/bin/python2.7 ez_setup.py
sudo unzip setuptools-20.4.zip
cd setuptools-20.4
sudo /usr/local/bin/easy_install-2.7 pip
sudo yum install numpy scipy python-matplotlib ipython python-pandas sympy python-nose
airflow needs a home, ~/airflow is the default,
but you can lay foundation somewhere else if you prefer (optional)
export AIRFLOW_HOME=~/airflow
sudo /usr/local/bin/pip2.7 install pysqlite
sudo /usr/local/bin/pip2.7 install airflow
sudo pip2.7 install airflow[hive]
sudo pip2.7 install airflow[celery]
sudo pip2.7 install airflow[mysql]
yum install rabbitmq-server
sudo yum install mysql-devel
sudo pip install mysql-python
airflow initdb
for using celery executor use below in airflow.cfg
sql_alchemy_conn =mysql://ro
ot:cloudera@localhost:3306/airflowdb
executor = CeleryExecutor
broker_url = amqp://guest:guest@localhost:5672/
celery_result_backend = db+mysql://root:cloudera@localhost:3306/airflow
sudo airflow webserver -p 8080
airflow backfill incremental_load -s 2015-06-01
airflow test incremental_load hive_insert_masked 2015-06-01;
sqoop import --connect jdbc:mysql://localhost:3306/employee --table employee_profile --username root --password cloudera --check-column modified_date --incremental lastmodified --last-value "2015-12-05 11:24:35" --target-dir /user/cloudera/incremental/
sqoop merge --new-data /user/cloudera/incremental --onto /user/hive/warehouse/employee_profile --jar-file /home/jars/employee_profile.jar --class-name employee_profile --target-dir /user/cloudera/employeeprofilemerge --merge-key profile_id
LOAD DATA INPATH '/user/cloudera/employeeprofilemerge' OVERWRITE INTO TABLE employee_profile;
add jar /home/cloudera/Masking.jar; create function masking as 'Masking';
INSERT OVERWRITE table temp SELECT profile_id,first_name,last_name,modified_date,masking(ssn) FROM employee_profile;