Skip to content

Spark Streaming

Chaerim edited this page Aug 25, 2024 · 3 revisions

Spark Streaming

💡 kafka topic에 있는 video binary를 배치로 가져와 stt 진행하여 influxdb에 text를 적재

요약

  1. Kafka topic에 적재된 video binary를 spark에 저장
  2. spark window를 사용하여 30초 분량의 binary를 적재하고 이를 배치 단위로 전처리
    • 30초의 binary를 합쳐 video.ts로 저장 → ffmpegvideo.tsvideo.mp4로 변환
    • whisper를 사용하여 mp4의 음성을 text로 전사
    • spark dataframe에 전사된 text를 새로운 column에 추가
  3. spark query를 사용하여 전처리된 배치에 대해 foreachBatch()를 사용하여 influxdb에 적재

코드 실행을 위한 환경 세팅

💡 Amazon Linux2, m4.xlarge 기준으로 작성되었습니다.

  • 최소 RAM 16GB (필수) + DISK 30GB 추천

Spark 설치

1. java 설치

sudo yum install java-1.8.0-amazon-corretto.x86_64 
sudo yum install java-1.8.0-amazon-corretto-devel.x86_64 
  • 자바가 잘 설치되었는지 확인
    • java -version으로 버전이 잘 뜨면 성공입니다.

2. JAVA_HOME 설정

  • vim ~/.bashrc 를 치고 아래의 코드를 추가
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre"
export PATH=$JAVA_HOME/bin:$PATH
  • source ~/.bashrc 를 치고 설정을 적용

3. pip & pyspark 설치

  • yum install pip
  • yum install pyspark

4. spark 설치

$ cd ~

$ wget https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz

$ tar -xvzf spark-3.5.2-bin-hadoop3.tgz

$ ln -s spark-3.5.2-bin-hadoop3.tgz spark

$ rm -rf spark-3.5.2-bin-hadoop3.tgz

5. spark 실행을 위한 /.bashrc 설정

  • vim ~/.bashrc를 치고 아래와 같이 수정
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre"
export SPARK_HOME="$HOME/spark"
export PATH=$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH
  • source ~/.bashrc 로 반영

  • pyspark 확인해 아래와 같이 실행되면 성공입니다.

    Python 3.9.16 (main, Jul  5 2024, 00:00:00)
    [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    24/08/20 07:08:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.5.2
          /_/
    
    Using Python version 3.9.16 (main, Jul  5 2024 00:00:00)
    Spark context Web UI available at http://ip-172-31-13-184.ap-northeast-2.compute.internal:4040
    Spark context available as 'sc' (master = local[*], app id = local-1724137707749).
    SparkSession available as 'spark'.
  • Possible Errors

    • 설치 가능한 java version이 다른 경우 (sudo yum install java 어쩌고가 안될 때)
      • yum list | grep java 로 설치 가능한 java 버전 확인 후 설치합니다.
    • java home 위치 확인하는 법
      • java -XshowSettings:properties -version 2>&1 > /dev/null | grep 'java.home'

ffmpeg 설치

  • ffmpeg 파일을 다운받아 환경변수에 path 추가
$ sudo su - 

$ cd /usr/local/bin

$ mkdir ffmpeg & cd ffmpeg

# https://johnvansickle.com/ffmpeg/ 에서 최신 버전 링크 가져오기
$ wget https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz

$ tar -xf ffmpeg-release-amd64-static.tar.xz -> ./ffmpeg -version cp -a /usr/local/bin/ffmpeg/ffmpeg-7.0.1-amd64-static/ . /usr/local/bin/ffmpeg/

$ ln -s /usr/local/bin/ffmpeg/ffmpeg /usr/bin/ffmpeg

# 원래 위치로 복귀
$ cd ~
  • ffmpeg -version 쳐서 버전이 출력되면 성공입니다.

Reference

[AWS Linux에서 FFMPEG 설치하기 (ElasticBeanStalk)](https://sundries-in-myidea.tistory.com/87#google_vignette)

Whisper 설치

  1. 필요한 라이브러리 설치
$ pip install numpy==1.26.4
$ pip install openai-whisper
  • 참고: 24.08.20 기준 whisper가 numpy version 2에서 잘 작동하지 않으므로 1.xx 버전으로 downgrade 필요합니다.
  • 만약 whisper 라이브러리 설치할 때 공간이 없다고 뜨는 경우:
    • /TMPDIR 설정 검색해 해결 가능합니다.

InfluxDB Dependency 설치

  • 전사된 text data를 db에 적재하기 위해서 influxdb client 선언을 위한 라이브러리 설치
$ pip install pandas

$ pip install influxdb_client

디렉토리 설정

  • mkdir test video 로 폴더 두개 생성

  • vi .env

    • 아래의 코드를 복붙합니다.

      INFLUXDB_TOKEN=Q-i24-5XGdQQm2sgN6QYmElVW8yZzj2fiPY4HT-2kre5SQ71ja2Fy7ldrUIfWmAg2QdB2GILvgoHHfDSjqR3Qw==
      INFLUXDB_URL=http://13.125.176.29:8086
      INFLUXDB_ORG=HighLighter
      INFLUXDB_BUCKET=HighLighter
  • github의 vi spark-batch.py를 작성합니다.

    • 해당 코드는 아래의 실행환경이 갖춰져야 정상동작합니다.

       KAFKA   ****kafka producer + broker가 잘 실행되고 & kafka ip 설정이 정확해야합니다.
      

      InfluxDB influxdb가 실행되고 & .env 설정이 정확해야합니다.


코드 실행

Reference

[Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.5.1 Documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)

  • 아래의 명령어로 spark-batch.py실행
    • python3로 실행하지 않고 spark-submit으로 실행
    • spark & kafka intergration을 위해서는 —packages를 반드시 추가해야합니다.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 spark-batch.py
  • Possible Errors
    • multiple queries 관련 에러
      • rm -rf checkpoint 로 폴더 삭제
      • 발생하는 이유: query가 checkpoint안의 메타데이터를 사용하여 작동하는데, 이전에 실행한 메타데이터를 읽을 경우 에러
    • kafka + spark Intergration 를 참고하라는 에러
      • —packages option을 넣어서 실행합니다.
    • Java Heap Size : Out Of Memory 에러
      • ec2의 RAM사이즈를 올려야 합니다
      • free -h 를 통해 확인한 바로는 최초 실행 시 7.6G 정도 사용합니다.
Clone this wiki locally