Skip to content

Commit

Permalink
Added --show-progress option, truncated video detection, auto-fix WiP,
Browse files Browse the repository at this point in the history
  • Loading branch information
vmdocua committed May 14, 2024
1 parent a2ca847 commit 5f1f5d8
Showing 1 changed file with 75 additions and 19 deletions.
94 changes: 75 additions & 19 deletions Capture/nosignal/reprostim/nosignal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging.config
import os
import subprocess
import sys
import tempfile
import time
from datetime import timedelta
from pydantic import BaseModel, Field
Expand All @@ -14,7 +17,11 @@


class VideoInfo(BaseModel):
error: str = Field(None, description="Error message")
fps: float = Field(..., description="Frames per second (FPS)")
width: int = Field(..., description="Video frame width")
height: int = Field(..., description="Video frame height")
is_truncated: bool = Field(False, description="Is video truncated")
frames_count: int = Field(..., description="Total number of frames")
nosignal_count: int = Field(..., description="Total number of nosignal "
"frames")
Expand All @@ -23,7 +30,7 @@ class VideoInfo(BaseModel):
scanned_count: int = Field(..., description="Total number of scanned frames")

def __str__(self):
return (f"VideoInfo(fps={self.fps}, "
return (f"VideoInfo({self.width}x{self.height}, fps={self.fps}, "
f"frames_count={self.frames_count}, "
f"scanned_count={self.scanned_count}, "
f"nosignal_count={self.nosignal_count}, "
Expand All @@ -40,6 +47,16 @@ def __str__(self):
upper_rainbow = np.array([30, 255, 255])


def auto_fix_video(video_path: str, temp_path: str):
logger.info(f"Run mediainfo to get video information: mediainfo -i {video_path}")
#subprocess.run("echo Run mediainfo", check=True,
# shell=True, capture_output=True, text=True)
#subprocess.run(f"mediainfo -i {video_path}", check=True,
# shell=True, capture_output=True, text=True)
#subprocess.run(["ffmpeg", "-i", video_path, "-c", "copy", temp_path])
logger.info("TODO: Implement auto-fix for truncated video.")


def has_rainbow(frame):
# Convert frame to HSV color space
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
Expand All @@ -62,25 +79,29 @@ def main_exit(code: int) -> int:


def find_no_signal(video_path: str, step: int = 1,
number_of_checks: int = 0) -> VideoInfo:
number_of_checks: int = 0,
show_progress_sec: float = 0.0) -> VideoInfo:
vi: VideoInfo = VideoInfo(fps=0, width=0, height=0,
is_truncated=False,
frames_count=0, nosignal_count=0,
nosignal_rate=0.0, scanned_count=0)

if step < 1:
logger.error("Step must be greater than 0")
raise ValueError("Step must be greater than 0")
vi.error = "Step must be greater than 0";
return vi

if number_of_checks < 0:
logger.error("Number of checks must be 0 or greater than 0")
raise ValueError("Number of checks must be 0 or greater than 0")
vi.error = "Number of checks must be 0 or greater than 0"
return vi

if number_of_checks > 0 and step > 1:
logger.warning(f"Number of checks is set, specified step value({step}) will be ignored.")
step = 1

vi: VideoInfo = VideoInfo(fps=0, frames_count=0, nosignal_count=0,
nosignal_rate=0.0, scanned_count=0)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"Couldn't open the video file: {video_path}")
return
vi.error = f"Couldn't open the video file: {video_path}"
return vi

n1 = cap.get(cv2.CAP_PROP_FRAME_COUNT)
logger.debug(f"n1={str(n1)}")
Expand All @@ -89,6 +110,8 @@ def find_no_signal(video_path: str, step: int = 1,
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width: int = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height: int = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
vi.width = frame_width
vi.height = frame_height
vi.fps = round(fps, 2)
logger.info(f"Video resolution={frame_width}x{frame_height}, fps={str(fps)}, "
f"frames count={str(frame_count)}")
Expand All @@ -110,17 +133,18 @@ def find_no_signal(video_path: str, step: int = 1,
pos_last_frame: int = pos_first_frame + frame_count

if pos_first_frame > pos_last_frame:
logger.error(f"Invalid frame range: {pos_first_frame} - {pos_last_frame}")
#raise RuntimeError(f"Invalid frame range: {pos_first_frame} - {pos_last_frame}")
vi.error = f"Invalid frame range: {pos_first_frame} - {pos_last_frame}"
vi.is_truncated = True
return vi

pos_cur_frame: int = pos_first_frame
pos_next_frame: int = pos_cur_frame
nosignal_counter: int = 0
scan_counter: int = 0
ts_progress = time.time()
progress_interval: float = 1
ts_progress: float = time.time()
progress_interval: float = show_progress_sec
while True:
if time.time() > ts_progress:
if show_progress_sec > 0.0 and time.time() > ts_progress:
dt: str = str(timedelta(milliseconds=int(cap.get(cv2.CAP_PROP_POS_MSEC))))
logger.info(f"Scanning progress: {pos_cur_frame} / {pos_last_frame}, {dt}")
ts_progress = time.time() + progress_interval
Expand All @@ -136,7 +160,7 @@ def find_no_signal(video_path: str, step: int = 1,

# set next frame position
if number_of_checks > 0:
pos_next_frame = pos_first_frame + frame_count * scan_counter / number_of_checks
pos_next_frame = pos_first_frame + int(frame_count * scan_counter / number_of_checks)
else:
pos_next_frame = pos_cur_frame + step

Expand Down Expand Up @@ -182,19 +206,51 @@ def find_no_signal(video_path: str, step: int = 1,
'of checks across entire video'
'frames. When set to 0, '
'all frames are checked.')
@click.option('--show-progress', default=1.0, type=float,
help='Specify the interval for showing progress, '
'default is 1.0 seconds. When set to 0 or less,'
'progress is not shown.')
@click.option('--auto-fix', default=False, is_flag=True,
help='Automatically fix truncated video when detected'
'before nosignal check. Default is False.'
'To check if video is truncated, use '
'mediainfo -i <video_file> | grep "IsTruncated"')
@click.option('--threshold', default=0.01, type=float,
help='Specify the threshold for nosignal frames, default is '
'0.01 which means 1% of the totally checked frames.')
@click.pass_context
def main(ctx, path: str, log_level, step: int,
number_of_checks: int, threshold: float):
number_of_checks: int,
show_progress: float,
auto_fix: bool,
threshold: float):
logger.setLevel(log_level)
logger.debug("nosignal.py tool")

logger.debug(f"path={path}")

res = find_no_signal(path, step, number_of_checks)
logger.info(f"Scan result : {str(res)}")
temp_path: str = None

res = find_no_signal(path, step, number_of_checks, show_progress)
if res.is_truncated is not None:
logger.error(f"ERROR : Trunctated video detected.")

if res.is_truncated and auto_fix:
logger.info("TRUNCATED VIDEO: Attempting to fix the truncated video.")
temp_path = tempfile.mktemp(suffix="_nosignal_fixed.mkv")
logger.info(f"Copying video to temporary file: {temp_path}")
auto_fix_video(path, temp_path)
res = find_no_signal(temp_path, step, number_of_checks, show_progress)

if res.error is not None:
logger.error(f"ERROR : {res.error}")

# delete temp_path if exists
if temp_path is not None and os.path.exists(temp_path):
logger.info(f"Deleting temporary file: {temp_path}")
os.remove(temp_path)

logger.info(f"SCAN RESULT : {str(res)}")
if res.nosignal_count > 0 and res.nosignal_rate > threshold:
click.echo(
f"FAILED : {res.nosignal_count} nosignal frames detected "
Expand Down

0 comments on commit 5f1f5d8

Please sign in to comment.