Skip to content

Spark Streaming

Chaerim edited this page Aug 25, 2024 · 3 revisions

요약

  1. Kafka topic에 적재된 video binary를 spark에 저장
  2. spark window를 사용하여 30초 분량의 binary를 적재하고 이를 배치 단위로 전처리
    • 30초의 binary를 합쳐 video.ts로 저장 → ffmpegvideo.tsvideo.mp4로 변환
    • whisper를 사용하여 mp4의 음성을 text로 전사
    • spark dataframe에 전사된 text를 새로운 column에 추가
  3. spark query를 사용하여 전처리된 배치에 대해 foreachBatch()를 사용하여 influxdb에 적재

코드 실행을 위한 환경 세팅

💡 Amazon Linux2, m4.xlarge 기준으로 작성되었습니다.

  • 최소 RAM 16GB (필수) + DISK 30GB 추천

Spark 설치

1. java 설치

sudo yum install java-1.8.0-amazon-corretto.x86_64 
sudo yum install java-1.8.0-amazon-corretto-devel.x86_64 
  • 자바가 잘 설치되었는지 확인
    • java -version으로 버전이 잘 뜨면 성공입니다.

2. JAVA_HOME 설정

  • vim ~/.bashrc 를 치고 아래의 코드를 추가
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre"
export PATH=$JAVA_HOME/bin:$PATH
  • source ~/.bashrc 를 치고 설정을 적용

3. pip & pyspark 설치

  • yum install pip
  • yum install pyspark

4. spark 설치

$ cd ~

$ wget https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz

$ tar -xvzf spark-3.5.2-bin-hadoop3.tgz

$ ln -s spark-3.5.2-bin-hadoop3.tgz spark

$ rm -rf spark-3.5.2-bin-hadoop3.tgz

5. spark 실행을 위한 /.bashrc 설정

  • vim ~/.bashrc를 치고 아래와 같이 수정
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre"
export SPARK_HOME="$HOME/spark"
export PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH
  • source ~/.bashrc 로 반영

  • pyspark 확인해 아래와 같이 실행되면 성공입니다.

    Python 3.9.16 (main, Jul  5 2024, 00:00:00)
    [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    24/08/20 07:08:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.5.2
          /_/
    
    Using Python version 3.9.16 (main, Jul  5 2024 00:00:00)
    Spark context Web UI available at http://ip-172-31-13-184.ap-northeast-2.compute.internal:4040
    Spark context available as 'sc' (master = local[*], app id = local-1724137707749).
    SparkSession available as 'spark'.
  • Possible Errors

    • 설치 가능한 java version이 다른 경우 (sudo yum install java 어쩌고가 안될 때)
      • yum list | grep java 로 설치 가능한 java 버전 확인 후 설치합니다.
    • java home 위치 확인하는 법
      • java -XshowSettings:properties -version 2>&1 > /dev/null | grep 'java.home'

ffmpeg 설치

  • ffmpeg 파일을 다운받아 환경변수에 path 추가
$ sudo su - 

$ cd /usr/local/bin

$ mkdir ffmpeg & cd ffmpeg

# https://johnvansickle.com/ffmpeg/ 에서 최신 버전 링크 가져오기
$ wget https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz

$ tar -xf ffmpeg-release-amd64-static.tar.xz -> ./ffmpeg -version cp -a /usr/local/bin/ffmpeg/ffmpeg-7.0.1-amd64-static/ . /usr/local/bin/ffmpeg/

$ ln -s /usr/local/bin/ffmpeg/ffmpeg /usr/bin/ffmpeg

# 원래 위치로 복귀
$ cd ~
  • ffmpeg -version 쳐서 버전이 출력되면 성공입니다.

Reference

[AWS Linux에서 FFMPEG 설치하기 (ElasticBeanStalk)](https://sundries-in-myidea.tistory.com/87#google_vignette)

Whisper 설치

  1. 필요한 라이브러리 설치
$ pip install numpy==1.26.4
$ pip install openai-whisper
  • 참고: 24.08.20 기준 whisper가 numpy version 2에서 잘 작동하지 않으므로 1.xx 버전으로 downgrade 필요합니다.
  • 만약 whisper 라이브러리 설치할 때 공간이 없다고 뜨는 경우:
    • /TMPDIR 설정 검색해 해결 가능합니다.

InfluxDB Dependency 설치

  • 전사된 text data를 db에 적재하기 위해서 influxdb client 선언을 위한 라이브러리 설치
$ pip install pandas

$ pip install influxdb_client

디렉토리 설정

  • mkdir test video 로 폴더 두개 생성

  • vi .env

    • 아래의 코드를 복붙합니다.

      INFLUXDB_TOKEN=Q-i24-5XGdQQm2sgN6QYmElVW8yZzj2fiPY4HT-2kre5SQ71ja2Fy7ldrUIfWmAg2QdB2GILvgoHHfDSjqR3Qw==
      INFLUXDB_URL=http://13.125.176.29:8086
      INFLUXDB_ORG=HighLighter
      INFLUXDB_BUCKET=HighLighter
  • github의 vi spark-batch.py를 작성합니다.

    • 해당 코드는 아래의 실행환경이 갖춰져야 정상동작합니다.

       KAFKA   ****kafka producer + broker가 잘 실행되고 & kafka ip 설정이 정확해야합니다.
      

      InfluxDB influxdb가 실행되고 & .env 설정이 정확해야합니다.


코드 실행

Reference

[Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.5.1 Documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)

  • 아래의 명령어로 spark-batch.py실행
    • python3로 실행하지 않고 spark-submit으로 실행
    • spark & kafka intergration을 위해서는 —packages를 반드시 추가해야합니다.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 spark-batch.py
  • Possible Errors
    • multiple queries 관련 에러
      • rm -rf checkpoint 로 폴더 삭제
      • 발생하는 이유: query가 checkpoint안의 메타데이터를 사용하여 작동하는데, 이전에 실행한 메타데이터를 읽을 경우 에러
    • kafka + spark Intergration 를 참고하라는 에러
      • —packages option을 넣어서 실행합니다.
    • Java Heap Size : Out Of Memory 에러
      • ec2의 RAM사이즈를 올려야 합니다
      • free -h 를 통해 확인한 바로는 최초 실행 시 7.6G 정도 사용합니다.