-
Notifications
You must be signed in to change notification settings - Fork 6
Spark Streaming
Chaerim edited this page Aug 25, 2024
·
3 revisions
💡 kafka topic에 있는 video binary를 배치로 가져와 stt 진행하여 influxdb에 text를 적재
- Kafka topic에 적재된 video binary를 spark에 저장
-
spark window를 사용하여 30초 분량의 binary를 적재하고 이를 배치 단위로 전처리
- 30초의 binary를 합쳐
video.ts
로 저장 →ffmpeg
로video.ts
를video.mp4
로 변환 -
whisper
를 사용하여mp4
의 음성을 text로 전사 - spark dataframe에 전사된 text를 새로운 column에 추가
- 30초의 binary를 합쳐
- spark query를 사용하여 전처리된 배치에 대해
foreachBatch()
를 사용하여 influxdb에 적재
💡 Amazon Linux2, m4.xlarge 기준으로 작성되었습니다.
- 최소 RAM 16GB (필수) + DISK 30GB 추천
sudo yum install java-1.8.0-amazon-corretto.x86_64
sudo yum install java-1.8.0-amazon-corretto-devel.x86_64
- 자바가 잘 설치되었는지 확인
-
java -version
으로 버전이 잘 뜨면 성공입니다.
-
-
vim ~/.bashrc
를 치고 아래의 코드를 추가
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre"
export PATH=$JAVA_HOME/bin:$PATH
-
source ~/.bashrc
를 치고 설정을 적용
yum install pip
yum install pyspark
$ cd ~
$ wget https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
$ tar -xvzf spark-3.5.2-bin-hadoop3.tgz
$ ln -s spark-3.5.2-bin-hadoop3.tgz spark
$ rm -rf spark-3.5.2-bin-hadoop3.tgz
-
vim ~/.bashrc
를 치고 아래와 같이 수정
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre"
export SPARK_HOME="$HOME/spark"
export PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH
-
source ~/.bashrc
로 반영 -
pyspark
확인해 아래와 같이 실행되면 성공입니다.Python 3.9.16 (main, Jul 5 2024, 00:00:00) [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] on linux Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/08/20 07:08:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.5.2 /_/ Using Python version 3.9.16 (main, Jul 5 2024 00:00:00) Spark context Web UI available at http://ip-172-31-13-184.ap-northeast-2.compute.internal:4040 Spark context available as 'sc' (master = local[*], app id = local-1724137707749). SparkSession available as 'spark'.
-
Possible Errors
- 설치 가능한 java version이 다른 경우 (sudo yum install java 어쩌고가 안될 때)
-
yum list | grep java
로 설치 가능한 java 버전 확인 후 설치합니다.
-
- java home 위치 확인하는 법
java -XshowSettings:properties -version 2>&1 > /dev/null | grep 'java.home'
- 설치 가능한 java version이 다른 경우 (sudo yum install java 어쩌고가 안될 때)
- ffmpeg 파일을 다운받아 환경변수에 path 추가
$ sudo su -
$ cd /usr/local/bin
$ mkdir ffmpeg & cd ffmpeg
# https://johnvansickle.com/ffmpeg/ 에서 최신 버전 링크 가져오기
$ wget https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz
$ tar -xf ffmpeg-release-amd64-static.tar.xz -> ./ffmpeg -version cp -a /usr/local/bin/ffmpeg/ffmpeg-7.0.1-amd64-static/ . /usr/local/bin/ffmpeg/
$ ln -s /usr/local/bin/ffmpeg/ffmpeg /usr/bin/ffmpeg
# 원래 위치로 복귀
$ cd ~
-
ffmpeg -version
쳐서 버전이 출력되면 성공입니다.
Reference
[AWS Linux에서 FFMPEG 설치하기 (ElasticBeanStalk)](https://sundries-in-myidea.tistory.com/87#google_vignette)
- STT를 위한 Whisper Model 설치
- whisper의 경우 library가 이미 제공되어 있습니다.
- Reference: https://github.com/openai/whisper
- 필요한 라이브러리 설치
$ pip install numpy==1.26.4
$ pip install openai-whisper
- 참고: 24.08.20 기준 whisper가 numpy version 2에서 잘 작동하지 않으므로 1.xx 버전으로 downgrade 필요합니다.
- 만약 whisper 라이브러리 설치할 때 공간이 없다고 뜨는 경우:
- /TMPDIR 설정 검색해 해결 가능합니다.
- 전사된 text data를 db에 적재하기 위해서 influxdb client 선언을 위한 라이브러리 설치
$ pip install pandas
$ pip install influxdb_client
-
mkdir test video
로 폴더 두개 생성 -
vi .env
-
아래의 코드를 복붙합니다.
INFLUXDB_TOKEN=Q-i24-5XGdQQm2sgN6QYmElVW8yZzj2fiPY4HT-2kre5SQ71ja2Fy7ldrUIfWmAg2QdB2GILvgoHHfDSjqR3Qw== INFLUXDB_URL=http://13.125.176.29:8086 INFLUXDB_ORG=HighLighter INFLUXDB_BUCKET=HighLighter
-
-
github의
vi spark-batch.py
를 작성합니다.-
해당 코드는 아래의 실행환경이 갖춰져야 정상동작합니다.
KAFKA ****kafka producer + broker가 잘 실행되고 & kafka ip 설정이 정확해야합니다.
InfluxDB influxdb가 실행되고 & .env 설정이 정확해야합니다.
-
Reference
[Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.5.1 Documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
- 아래의 명령어로
spark-batch.py
실행- python3로 실행하지 않고 spark-submit으로 실행
- spark & kafka intergration을 위해서는 —packages를 반드시 추가해야합니다.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 spark-batch.py
- Possible Errors
- multiple queries 관련 에러
-
rm -rf checkpoint
로 폴더 삭제 - 발생하는 이유: query가 checkpoint안의 메타데이터를 사용하여 작동하는데, 이전에 실행한 메타데이터를 읽을 경우 에러
-
- kafka + spark Intergration 를 참고하라는 에러
- —packages option을 넣어서 실행합니다.
- Java Heap Size : Out Of Memory 에러
- ec2의 RAM사이즈를 올려야 합니다
-
free -h
를 통해 확인한 바로는 최초 실행 시 7.6G 정도 사용합니다.
- multiple queries 관련 에러