Skip to content

Commit

Permalink
added lab 6 and solution for lab 4
Browse files Browse the repository at this point in the history
  • Loading branch information
Den4ikless committed May 16, 2018
1 parent 3731a04 commit ece1bd7
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 0 deletions.
44 changes: 44 additions & 0 deletions labs/lab06/de_lab_06.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
## Лаба 6. Мониторинг и тестирование пайплайна

<img src="http://data.newprolab.com/public-newprolab-com/de_lab06_kibana.png" width="170px" align="center"> <img src="http://data.newprolab.com/public-newprolab-com/de_lab05_grafana.svg" width="170px" align="center">

### 1. Мониторинг пайплайна

Для нужд мониторинга вашего пайплайна у вас есть два инструмента: kibana — для мониторинга Elastic + Logstash, Grafana — для всего остального (например, скорость чтения с диска, очередь в Kafka и др.).

В качестве напоминания: б*о*льшую часть данных в Grafana сама себя не напишет. Вам нужен отдельный инструмент для сбора этих данных: Graphit или Prometeus.

У вас должно получиться что-то подобное (набор метрик на картинке случаен):

<img src="http://data.newprolab.com/public-newprolab-com/de_lab06_grafana_kafka.png">

<p align="center">*Мониторинг Kafka*</p>

<img src="http://data.newprolab.com/public-newprolab-com/de_lab06_grafana_elastic.png">

<p align="center">*Мониторинг Elasticsearch*</p>

### 2. Стресс-тест пайплайна

Во время защиты мы натравим на ваш сайт нашу пушку, которая будет генерировать клики со скоростью более 2000 кликов в секунду в течение 3-х минут. Натравить пушку можно будет в пятницу с утра. В тот же день заработает и сам чекер.

Проект будет считаться засчитанным, если следующие показатели за время проверки не увеличатся более чем на 50% по сравнению с вашими историческими данными:

* для elasticsearch:
* `indexing latency` ,
* показатели для kafka:
* `TotalTimeMs` в broker metrics,
* `Request latency average` в kafka producer metrics,
* `ConsumerLag/MaxLag` в Kafka consumer metrics,
* показатели для spark streaming:
* `active tasks (stacked per executor)`

* `HDFS reads/executor`

* `completed tasks per executor`


### 3. Ссылки для изучения

* [Мониторинг Kafka](https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/)
Binary file added solution/img/7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solution/img/8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
181 changes: 181 additions & 0 deletions solution/solution_lab_04.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@


### Лаба 4. Запуск джобы Spark по расписанию и сохранение результата в ClickHouse

Если вы верно прошли GIST по установке новой версии спарка, то в итоге у вас должен отработать следующий скрипт.

```Python
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--packages com.databricks:spark-csv_2.10:1.2.0 pyspark-shell --num-executors 5'
os.environ["PYSPARK_PYTHON"]='python3'
#os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.6-src.zip'))
os.environ["PYSPARK_PYTHON"] = 'python3'
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())
```

```
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 3.6.4 (default, May 3 2018 18:10:19)
SparkSession available as 'spark'.
```

Если он у вас нормально отрабатывает в jupyter, то можно смело приступать к выполнению лабы. Задача делится на на три части:

1. Взять паркет-файл с hdfs
2. Выполнить вордкаунт
3. Положить в кликхаус

Предварительно необходимо создать бд и таблицу в клихаусе. Для этого можно воспользоваться http интерфейсом. В <a href="https://clickhouse.yandex/docs/ru/interfaces/http_interface/">русскоязычной документации </a>можно легко найти, как это сделать.

```python
from datetime import datetime
from airflow import DAG
from aitflow.operators.python_operator import PythonOperator

import logging
log = logging.getLogger(_name_)

import subprocess
import json
import os
import sys
import py4j

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME enviroment variable is not set')
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

#vars
top_cnts =20
default_args = {
'owner' : 'airflow',
'depends_on_past': False,
'start_date': datetime(2018,5,4),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
dag =DAG('DAG_SPARK', description-'SPARK to clickhouse',
default_args=default_args,
schedule_interval='5 5 * * *')
HOST = 'http://localhost:8123/'
import requests

def get_clickhouse_data(query, host = HOST, connection_timeout = 1500):
NUMBER_OF_TRIES = 30
DELAY = 10

for i in range(NUMBER_OF_TRIES):
r = requests.post(host, params = {'timeout_before_checking_execution_speed': 120, 'max_execution_time': 6000}
, timeout = connection_timeout, data = query)
if r.status_code == 200:
return r.text
else:
print('ATTENTION: try #%d failed' % i)
if i != (NUMBER_OF_TRIES-1):
print(query)
print(r.text)
time.sleep(DELAY*(i+1))
else:
raise ValueError(r.text)

def upload(table, content, host=HOST):
'''Uploads data to table in ClickHous'''
content = content.encode('utf-8')
query_dict = {
'query': 'INSERT INTO ' + table + ' FORMAT TabSeparatedWithNames '
}
r = requests.post(host, data=content, params=query_dict)
result = r.text
if r.status_code == 200:
return result
else:
raise ValueError(r.text)

def pre_data_spark4clickhose():
SparkContext._ensure_initialized()
conf = SparkConf()
spark = SparkSession.builder.get0rCreate()
sc = spark.sparkContext
pq_file_name='/results/'+datetime.strftime(datetime.now(), "%Y_%m_%d.")+'.parquet'
log.info('SparkSession: %s', str(spark))
log.info('sparkContext: %s', str(sc))
df_elk_data = spark.read.parquet(pq_file_name)
rdd_elk_data = df_elk_data.select(df_elk_data.location).rdd
count=rdd_elk_data.flatMap(lambda x: str(x[0]).split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b).takeOrdered(top_cnts, lambda x: =x[1])
sc.stop()
upload('login', count, HOST)


spark4clickhose_batch_operator = PythonOperator(task_id='spark4clickhose_batch_task', provide_contex=True, python_callable=pre_data_spark4clickhose)
```

Следущий код выполняет поставленные задачи. Стоит заметить, что вордкаунт выполнен с применением rdd. Однако не стоит зацикливаться именно на нём. Существует множетство подходов. Осталось это зарегистрировать в airflow, как в предыдущей лабе и заняться подключением superset.

Если суперсет не входит в ваш набор hdp, то можно установить его либо через <a href="https://hub.docker.com/r/amancevice/superset/">докер</a>, либо <a href="https://superset.apache.org/installation.html">классическим способом</a>. Здесь будет приведен классический способ.

```
sudo apt-get install build-essential libssl-dev libffi-dev python-dev python-pip libsasl2-dev libldap2-dev
```

```
virtualenv venv
```

```bash
pip install --upgrade setuptools pip
```

```bash
# Install superset
pip install superset

# Create an admin user (you will be prompted to set username, first and last name before setting a password)
fabmanager create-admin --app superset

# Initialize the database
superset db upgrade

# Load some data to play with
superset load_examples

# Create default roles and permissions
superset init

# To start a development web server on port 8088, use -p to bind to another port
# superset runserver -d
```

По умолчанию суперсет запускается на 8088 порту. Ткаже чекер смотрит и через этот же порт.

Чтобы подключить кликхаус, то нужно поставить диалект для SQLAlchemy, через который суперсет и будет показывать данные. Инструкция по установке находится <a href="https://github.com/cloudflare/sqlalchemy-clickhouse">здесь</a>. В докере он установлен по умолчанию. Теперь указываем URI в следующем формате:

```
clickhouse://username:password@hostname:port/database
```

<img src="img/8.png">

После успешного подключения можно смело писать SQL запросы и строить красивые дашборды. Например, вот так.

<img src="img/7.png">



Теперь можно запускать чекер. Лаба решена!

0 comments on commit ece1bd7

Please sign in to comment.