Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: Flume's java process is always running #239

Open
woxiangwoshi123 opened this issue Oct 22, 2024 · 2 comments
Open

[Question]: Flume's java process is always running #239

woxiangwoshi123 opened this issue Oct 22, 2024 · 2 comments
Labels
question Further information is requested

Comments

@woxiangwoshi123
Copy link

woxiangwoshi123 commented Oct 22, 2024

I used the author's ds2hdfs.py script to collect dolphinscheduler log files and upload them to hdfs. I found that there are many flume java programs running. Please tell me how to stop。
For example:
Snipaste_2024-10-22_14-20-08

@woxiangwoshi123 woxiangwoshi123 added the question Further information is requested label Oct 22, 2024
@woxiangwoshi123
Copy link
Author

The content of ds2hdfs.py is as follows

import os
import time
import threading
import subprocess
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

'''
   Flume plugin to collect log
'''
class FlumeHandler:
    def __init__(self, file_name, relative_path, log_file):
        self.file_name = file_name
        self.relative_path = relative_path
        self.log_file = log_file
        self.flume_process = None
        self.is_updated = True
        self.is_processing = False
        self.semaphore = threading.Semaphore(0)
        self.timer = None
        self.flume_flag = '_'.join(self.relative_path.replace('/', '_').split('/') + self.file_name.split('.')[:-1])

    def init_flume_config(self):
        # Building flume configuration contents
        config_content = f"""
compass_{self.flume_flag}.sinks=ds2hdfsSink_{self.flume_flag}
compass_{self.flume_flag}.channels=ds2hdfsCh_{self.flume_flag}
compass_{self.flume_flag}.sources=ds2hdfsSrc_{self.flume_flag}

compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.type = TAILDIR
compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.positionFile=./taildir_position/taildir_position_{self.flume_flag}.json
compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.filegroups = fg1
compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.filegroups.fg1 = {self.log_file}
compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.fileHeader = true
compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.deserializer.maxLineLength = 1024
compass_{self.flume_flag}.sources.ds2hdfsSrc_{self.flume_flag}.channels=ds2hdfsCh_{self.flume_flag}

compass_{self.flume_flag}.channels.ds2hdfsCh_{self.flume_flag}.type=file
compass_{self.flume_flag}.channels.ds2hdfsCh_{self.flume_flag}.capacity=10000
compass_{self.flume_flag}.channels.ds2hdfsCh_{self.flume_flag}.transactionCapacity=100
compass_{self.flume_flag}.channels.ds2hdfsCh_{self.flume_flag}.checkpointDir=./.flume/file-channel/ds2hdfsCh/{self.flume_flag}/checkpoint
compass_{self.flume_flag}.channels.ds2hdfsCh_{self.flume_flag}.dataDirs=./.flume/file-channel/ds2hdfsCh/{self.flume_flag}/data

compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.type=hdfs
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.channel=ds2hdfsCh_{self.flume_flag}
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.path=hdfs://127.0.0.1:8020/flume/dolphinscheduler/%Y-%m-%d/{self.relative_path}
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.rollCount=0
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.filePrefix={self.file_name}
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.fileSuffix=
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.fileType=DataStream
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.rollInterval=20
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.rollSize=0
compass_{self.flume_flag}.sinks.ds2hdfsSink_{self.flume_flag}.hdfs.useLocalTimeStamp=true
"""

        # Save configuration contents to a temporary configuration file
        config_file = f'./flume_{self.flume_flag}.conf'
        with open(config_file, 'w') as f:
            f.write(config_content)

        return config_file

    def start_flume(self):
        # Generate flume configuration file
        config_file = self.init_flume_config()

        # Start flume process, passing file name, relative path, and absolute path as arguments
        cmd = f'./flume/bin/flume-ng agent -n compass_{self.flume_flag} -c ./flume/conf/ -f {config_file}'
        self.flume_process = subprocess.Popen(cmd, shell=True)

    def stop_flume(self):
        if self.flume_process:
            print(f"File done: {self.log_file}")
            # Terminate flume process
            self.flume_process.terminate()
            self.flume_process.wait()
            self.flume_process = None

            # Delete configuration file
            config_file = self.init_flume_config()
            os.remove(config_file)

    def update_status(self, is_updated):
        self.is_updated = is_updated
        if not self.is_updated:
            # Release semaphore as file is no longer updating
            self.semaphore.release()

    def set_processing_status(self, is_processing):
        print(f"File processing: {self.log_file}")
        self.is_processing = is_processing

    def start_timer(self, interval):
        # Start timer to stop the flume process if the file is not updated within the specified time interval.
        self.timer = threading.Timer(interval, self.stop_flume)
        self.timer.start()

    def stop_timer(self):
        # Stop timer
        if self.timer and self.timer.is_alive():
            self.timer.cancel()


class LogFileEventHandler(FileSystemEventHandler):
    def __init__(self, root_directory):
        self.flume_handlers = {}
        self.root_directory = root_directory

    def on_modified(self, event):
        if event.is_directory:
            return  # Ignore changes in directory
        elif not os.path.basename(event.src_path).startswith(tuple(map(str, range(10)))):
            return  # Ignore logs from the ds application itself
        elif event.event_type == 'modified':
            # Process file content modifications
            print(f"File modified: {event.src_path}")
            log_file = event.src_path
            file_name = os.path.basename(event.src_path)
            relative_path = os.path.dirname(os.path.relpath(log_file, self.root_directory))
            file_path = log_file
            if file_path not in self.flume_handlers:
                # Create FlumeHandler and start flume process
                flume_handler = FlumeHandler(file_name, relative_path, log_file)
                flume_handler.start_flume()
                self.flume_handlers[file_path] = flume_handler

            flume_handler = self.flume_handlers[file_path]
            if not flume_handler.is_processing:
                # If file is not being processed, update file status to updated
                flume_handler.update_status(True)
                flume_handler.stop_timer()
            else:
                # If file is being processed, ignore this modification event
                return

            # Start timer and set time interval for file to stop updating as 180 seconds
            flume_handler.start_timer(10)

    def on_created(self, event):
        if event.is_directory:
            return  # Ignore changes in directory
        elif not os.path.basename(event.src_path).startswith(tuple(map(str, range(10)))):
            return  # Ignore logs from the ds application itself
        elif event.event_type == 'created':
            # Process newly created file
            print(f"File created: {event.src_path}")
            log_file = event.src_path
            file_name = os.path.basename(event.src_path)
            relative_path = os.path.dirname(os.path.relpath(log_file, self.root_directory))

            file_path = log_file
            if file_path not in self.flume_handlers:
                # Create FlumeHandler and start flume process
                flume_handler = FlumeHandler(file_name, relative_path, log_file)
                flume_handler.start_flume()
                self.flume_handlers[file_path] = flume_handler
            flume_handler = self.flume_handlers[file_path]
            if not flume_handler.is_processing:
                # If file is not being processed, update file status to updated
                flume_handler.update_status(True)
                flume_handler.stop_timer()
            else:
                # If the file is being processed, ignore this creation event.
                return

            # Start timer and set time interval for file to stop updating as 180 seconds
            flume_handler.start_timer(10)


if __name__ == "__main__":
    # Set working directory
    working_directory = "/opt/"
    os.chdir(working_directory)
    # Watch the root path of log
    directory_to_watch = "/opt/dolphinscheduler/worker-server/logs"
    event_handler = LogFileEventHandler(directory_to_watch)
    observer = Observer()
    observer.schedule(event_handler, directory_to_watch, recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

@woxiangwoshi123
Copy link
Author

My flume dockerfile is configured as follows

FROM reg.unicom/galaxy/ts-bdt/ts-bdt-eclipse-temurin:8-focal

RUN set -x && \
    apt-get update -q && \
    apt-get install -y tzdata &&\
    rm -rf /var/lib/apt/lists/* 


ENV PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

# Python
RUN apt-get update -q && \
    apt-get install -yq python3.8 && \
    apt-get install -yq python3-pip

RUN pip3 install watchdog

ARG TZ
ARG HADOOP_VERSION
ARG SPARK_VERSION

COPY hadoop-3.1.1.tar.gz /opt
COPY spark-3.3.4-bin-hadoop3.tgz /opt
COPY apache-flume-1.11.0-bin.tar.gz /opt

# Hadoop, Spark, Flume
RUN set -x && \
    tar -xzf /opt/hadoop-3.1.1.tar.gz -C /opt && \
    ln -s /opt/hadoop-3.1.1 /opt/hadoop && \
    rm /opt/hadoop-3.1.1.tar.gz && \
    tar -xzf /opt/spark-3.3.4-bin-hadoop3.tgz -C /opt && \
    ln -s /opt/spark-3.3.4-bin-hadoop3 /opt/spark && \
    rm /opt/spark-3.3.4-bin-hadoop3.tgz && \
    tar -xzf /opt/apache-flume-1.11.0-bin.tar.gz -C /opt && \
    ln -s /opt/apache-flume-1.11.0-bin /opt/flume && \
    rm /opt/apache-flume-1.11.0-bin.tar.gz

COPY script/ds2hdfs.py /opt

COPY conf/hadoop/core-site.xml /opt/hadoop/etc/hadoop
COPY conf/hadoop/hdfs-site.xml /opt/hadoop/etc/hadoop
COPY conf/hadoop/mapred-site.xml /opt/hadoop/etc/hadoop
COPY conf/hadoop/yarn-site.xml /opt/hadoop/etc/hadoop
COPY conf/hadoop/hadoop-env.sh /opt/hadoop/etc/hadoop

RUN cp /opt/hadoop/share/hadoop/common/*.jar /opt/flume/lib/ && \
    cp /opt/hadoop/share/hadoop/common/lib/*.jar /opt/flume/lib/ && \
    cp /opt/hadoop/share/hadoop/hdfs/*.jar /opt/flume/lib/ && \
    cp /opt/hadoop/share/hadoop/hdfs/lib/*.jar /opt/flume/lib/


ENTRYPOINT ["sh", "-c", "nohup python3 /opt/ds2hdfs.py > /opt/ds2hdfs.log 2>&1; tail -f /opt/ds2hdfs.log"]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant