From 5f1f5d879be430b7088520473f9005ee896260fa Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Tue, 14 May 2024 14:54:31 +0300 Subject: [PATCH] Added --show-progress option, truncated video detection, auto-fix WiP, #82. --- Capture/nosignal/reprostim/nosignal.py | 94 ++++++++++++++++++++------ 1 file changed, 75 insertions(+), 19 deletions(-) diff --git a/Capture/nosignal/reprostim/nosignal.py b/Capture/nosignal/reprostim/nosignal.py index 3acd85c..dd6809a 100644 --- a/Capture/nosignal/reprostim/nosignal.py +++ b/Capture/nosignal/reprostim/nosignal.py @@ -1,5 +1,8 @@ import logging.config +import os +import subprocess import sys +import tempfile import time from datetime import timedelta from pydantic import BaseModel, Field @@ -14,7 +17,11 @@ class VideoInfo(BaseModel): + error: str = Field(None, description="Error message") fps: float = Field(..., description="Frames per second (FPS)") + width: int = Field(..., description="Video frame width") + height: int = Field(..., description="Video frame height") + is_truncated: bool = Field(False, description="Is video truncated") frames_count: int = Field(..., description="Total number of frames") nosignal_count: int = Field(..., description="Total number of nosignal " "frames") @@ -23,7 +30,7 @@ class VideoInfo(BaseModel): scanned_count: int = Field(..., description="Total number of scanned frames") def __str__(self): - return (f"VideoInfo(fps={self.fps}, " + return (f"VideoInfo({self.width}x{self.height}, fps={self.fps}, " f"frames_count={self.frames_count}, " f"scanned_count={self.scanned_count}, " f"nosignal_count={self.nosignal_count}, " @@ -40,6 +47,16 @@ def __str__(self): upper_rainbow = np.array([30, 255, 255]) +def auto_fix_video(video_path: str, temp_path: str): + logger.info(f"Run mediainfo to get video information: mediainfo -i {video_path}") + #subprocess.run("echo Run mediainfo", check=True, + # shell=True, capture_output=True, text=True) + #subprocess.run(f"mediainfo -i {video_path}", check=True, + # shell=True, capture_output=True, text=True) + #subprocess.run(["ffmpeg", "-i", video_path, "-c", "copy", temp_path]) + logger.info("TODO: Implement auto-fix for truncated video.") + + def has_rainbow(frame): # Convert frame to HSV color space hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) @@ -62,25 +79,29 @@ def main_exit(code: int) -> int: def find_no_signal(video_path: str, step: int = 1, - number_of_checks: int = 0) -> VideoInfo: + number_of_checks: int = 0, + show_progress_sec: float = 0.0) -> VideoInfo: + vi: VideoInfo = VideoInfo(fps=0, width=0, height=0, + is_truncated=False, + frames_count=0, nosignal_count=0, + nosignal_rate=0.0, scanned_count=0) + if step < 1: - logger.error("Step must be greater than 0") - raise ValueError("Step must be greater than 0") + vi.error = "Step must be greater than 0"; + return vi if number_of_checks < 0: - logger.error("Number of checks must be 0 or greater than 0") - raise ValueError("Number of checks must be 0 or greater than 0") + vi.error = "Number of checks must be 0 or greater than 0" + return vi if number_of_checks > 0 and step > 1: logger.warning(f"Number of checks is set, specified step value({step}) will be ignored.") step = 1 - vi: VideoInfo = VideoInfo(fps=0, frames_count=0, nosignal_count=0, - nosignal_rate=0.0, scanned_count=0) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): - logger.error(f"Couldn't open the video file: {video_path}") - return + vi.error = f"Couldn't open the video file: {video_path}" + return vi n1 = cap.get(cv2.CAP_PROP_FRAME_COUNT) logger.debug(f"n1={str(n1)}") @@ -89,6 +110,8 @@ def find_no_signal(video_path: str, step: int = 1, fps = cap.get(cv2.CAP_PROP_FPS) frame_width: int = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height: int = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + vi.width = frame_width + vi.height = frame_height vi.fps = round(fps, 2) logger.info(f"Video resolution={frame_width}x{frame_height}, fps={str(fps)}, " f"frames count={str(frame_count)}") @@ -110,17 +133,18 @@ def find_no_signal(video_path: str, step: int = 1, pos_last_frame: int = pos_first_frame + frame_count if pos_first_frame > pos_last_frame: - logger.error(f"Invalid frame range: {pos_first_frame} - {pos_last_frame}") - #raise RuntimeError(f"Invalid frame range: {pos_first_frame} - {pos_last_frame}") + vi.error = f"Invalid frame range: {pos_first_frame} - {pos_last_frame}" + vi.is_truncated = True + return vi pos_cur_frame: int = pos_first_frame pos_next_frame: int = pos_cur_frame nosignal_counter: int = 0 scan_counter: int = 0 - ts_progress = time.time() - progress_interval: float = 1 + ts_progress: float = time.time() + progress_interval: float = show_progress_sec while True: - if time.time() > ts_progress: + if show_progress_sec > 0.0 and time.time() > ts_progress: dt: str = str(timedelta(milliseconds=int(cap.get(cv2.CAP_PROP_POS_MSEC)))) logger.info(f"Scanning progress: {pos_cur_frame} / {pos_last_frame}, {dt}") ts_progress = time.time() + progress_interval @@ -136,7 +160,7 @@ def find_no_signal(video_path: str, step: int = 1, # set next frame position if number_of_checks > 0: - pos_next_frame = pos_first_frame + frame_count * scan_counter / number_of_checks + pos_next_frame = pos_first_frame + int(frame_count * scan_counter / number_of_checks) else: pos_next_frame = pos_cur_frame + step @@ -182,19 +206,51 @@ def find_no_signal(video_path: str, step: int = 1, 'of checks across entire video' 'frames. When set to 0, ' 'all frames are checked.') +@click.option('--show-progress', default=1.0, type=float, + help='Specify the interval for showing progress, ' + 'default is 1.0 seconds. When set to 0 or less,' + 'progress is not shown.') +@click.option('--auto-fix', default=False, is_flag=True, + help='Automatically fix truncated video when detected' + 'before nosignal check. Default is False.' + 'To check if video is truncated, use ' + 'mediainfo -i | grep "IsTruncated"') @click.option('--threshold', default=0.01, type=float, help='Specify the threshold for nosignal frames, default is ' '0.01 which means 1% of the totally checked frames.') @click.pass_context def main(ctx, path: str, log_level, step: int, - number_of_checks: int, threshold: float): + number_of_checks: int, + show_progress: float, + auto_fix: bool, + threshold: float): logger.setLevel(log_level) logger.debug("nosignal.py tool") logger.debug(f"path={path}") - res = find_no_signal(path, step, number_of_checks) - logger.info(f"Scan result : {str(res)}") + temp_path: str = None + + res = find_no_signal(path, step, number_of_checks, show_progress) + if res.is_truncated is not None: + logger.error(f"ERROR : Trunctated video detected.") + + if res.is_truncated and auto_fix: + logger.info("TRUNCATED VIDEO: Attempting to fix the truncated video.") + temp_path = tempfile.mktemp(suffix="_nosignal_fixed.mkv") + logger.info(f"Copying video to temporary file: {temp_path}") + auto_fix_video(path, temp_path) + res = find_no_signal(temp_path, step, number_of_checks, show_progress) + + if res.error is not None: + logger.error(f"ERROR : {res.error}") + + # delete temp_path if exists + if temp_path is not None and os.path.exists(temp_path): + logger.info(f"Deleting temporary file: {temp_path}") + os.remove(temp_path) + + logger.info(f"SCAN RESULT : {str(res)}") if res.nosignal_count > 0 and res.nosignal_rate > threshold: click.echo( f"FAILED : {res.nosignal_count} nosignal frames detected "