Skip to content
This repository has been archived by the owner on Jul 6, 2024. It is now read-only.

Commit

Permalink
fix: parallel, sequenceのフローの整理
Browse files Browse the repository at this point in the history
  • Loading branch information
PigeonsHouse committed Dec 29, 2023
1 parent ef24ccd commit 78b4ae1
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 461 deletions.
47 changes: 21 additions & 26 deletions src/converter/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
audio_volume_filter,
get_background_color_code,
object_length_filter,
time_padding_end_filter,
time_padding_start_filter,
set_background_filter,
time_space_end_filter,
time_space_start_filter,
width_height_filter,
)
from .utils import get_background_process, get_graphical_process
Expand All @@ -35,19 +36,20 @@ def get_process_by_source(
case SourceType.AUDIO:
audio_process = ffmpeg.input(src_path).audio
case SourceType.TEXT:
padding_left_px = style.padding_left.get_pixel()
padding_top_px = style.padding_top.get_pixel()
(
width_with_padding,
height_with_padding,
) = style.get_size_with_padding()
transparent_process = get_background_process(
"{}x{}".format(width_with_padding, height_with_padding),
"{}x{}".format(
width_with_padding.get_pixel(),
height_with_padding.get_pixel(),
),
style.background_color,
)
option: dict = {
"x": padding_left_px,
"y": padding_top_px,
"x": style.padding_left.get_pixel(),
"y": style.padding_top.get_pixel(),
}
if style.font_family is not None:
option |= {
Expand Down Expand Up @@ -100,29 +102,22 @@ def create_source_process(
style.width, style.height, video_process
)
# padding and background-color
padding_top_px = style.padding_top.get_pixel()
padding_left_px = style.padding_left.get_pixel()
padding_right_px = style.padding_right.get_pixel()
padding_bottom_px = style.padding_bottom.get_pixel()
if (
padding_top_px != 0
or padding_left_px != 0
or padding_right_px != 0
or padding_bottom_px != 0
style.padding_top.is_zero_over()
or style.padding_left.is_zero_over()
):
(
width_with_padding,
height_with_padding,
) = style.get_size_with_padding()
transparent_process = get_background_process(
"{}x{}".format(width_with_padding, height_with_padding),
style.background_color,
)
video_process = ffmpeg.overlay(
transparent_process,
video_process,
x=padding_left_px,
y=padding_top_px,
video_process = set_background_filter(
width_with_padding,
height_with_padding,
background_color=style.background_color,
video_process=video_process,
position_x=style.padding_left,
position_y=style.padding_top,
fit_video_process=True,
)

if audio_process is not None:
Expand All @@ -137,14 +132,14 @@ def create_source_process(
video_process, audio_process = object_length_filter(
style.object_length, video_process, audio_process
)
video_process, audio_process = time_padding_start_filter(
video_process, audio_process = time_space_start_filter(
style.time_padding_start,
background_color_code,
video_process,
audio_process,
)
if style.object_length.unit in [TimeUnit.FRAME, TimeUnit.SECOND]:
video_process, audio_process = time_padding_end_filter(
video_process, audio_process = time_space_end_filter(
style.time_padding_end,
background_color_code,
video_process,
Expand Down
10 changes: 4 additions & 6 deletions src/converter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def convert_video(
out_filename = "video.mp4" if out_filename is None else out_filename

process = create_process(vsml_data.content, debug_mode)
fps = VSMLManager.get_root_fps()
style = vsml_data.content.style
if process.video:
bg_process = get_background_process(
Expand Down Expand Up @@ -82,7 +81,7 @@ def convert_video(
process.audio,
"adelay",
all=1,
delays=int(style.time_margin_start.get_second(fps) * 1000),
delays=int(style.time_margin_start.get_second() * 1000),
)
if style.object_length.unit in [
TimeUnit.FRAME,
Expand All @@ -95,14 +94,14 @@ def convert_video(
"stop": style.time_margin_end.value,
}
audio_option = {
"pad_dur": style.time_margin_end.get_second(fps),
"pad_dur": style.time_margin_end.get_second(),
}
elif style.time_margin_end.unit == TimeUnit.SECOND:
video_option = {
"stop_duration": style.time_margin_end.get_second(fps),
"stop_duration": style.time_margin_end.get_second(),
}
audio_option = {
"pad_dur": style.time_margin_end.get_second(fps),
"pad_dur": style.time_margin_end.get_second(),
}
if process.video is not None:
process.video = ffmpeg.filter(
Expand Down Expand Up @@ -143,7 +142,6 @@ def convert_video(
.replace("True", "true")
.replace("False", "false")
)
print(content_str)
content_str = json.dumps(
(json.loads(content_str)), indent=2, ensure_ascii=False
)
Expand Down
146 changes: 125 additions & 21 deletions src/converter/style_to_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
TimeUnit,
TimeValue,
)
from utils import VSMLManager

from .utils import get_background_process


def get_background_color_code(
Expand Down Expand Up @@ -85,7 +86,7 @@ def object_length_filter(
option = {"end": object_length.value}
video_process = ffmpeg.trim(video_process, **option)
if audio_process is not None:
audio_end = object_length.get_second(VSMLManager.get_root_fps())
audio_end = object_length.get_second()
audio_process = ffmpeg.filter(
audio_process,
"atrim",
Expand All @@ -103,20 +104,20 @@ def object_length_filter(
return video_process, audio_process


def time_padding_start_filter(
time_padding_start: TimeValue,
def time_space_start_filter(
time_space_start: TimeValue,
background_color_code: Optional[str] = None,
video_process: Optional[Any] = None,
audio_process: Optional[Any] = None,
) -> tuple[Any, Any]:
if time_padding_start.unit in [TimeUnit.FRAME, TimeUnit.SECOND]:
if time_space_start.unit in [TimeUnit.FRAME, TimeUnit.SECOND]:
if video_process is not None:
option = {}
if time_padding_start.unit == TimeUnit.FRAME:
option = {"start": time_padding_start.value}
elif time_padding_start.unit == TimeUnit.SECOND:
if time_space_start.unit == TimeUnit.FRAME:
option = {"start": time_space_start.value}
elif time_space_start.unit == TimeUnit.SECOND:
option = {
"start_duration": time_padding_start.value,
"start_duration": time_space_start.value,
}
video_process = ffmpeg.filter(
video_process,
Expand All @@ -125,10 +126,7 @@ def time_padding_start_filter(
**option,
)
if audio_process is not None:
delays = int(
time_padding_start.get_second(VSMLManager.get_root_fps())
* 1000
)
delays = int(time_space_start.get_second() * 1000)
audio_process = ffmpeg.filter(
audio_process,
"adelay",
Expand All @@ -138,29 +136,135 @@ def time_padding_start_filter(
return video_process, audio_process


def time_padding_end_filter(
time_padding_end: TimeValue,
def time_space_end_filter(
time_space_end: TimeValue,
background_color_code: Optional[str] = None,
video_process: Optional[Any] = None,
audio_process: Optional[Any] = None,
) -> tuple[Any, Any]:
if time_padding_end.unit in [TimeUnit.FRAME, TimeUnit.SECOND]:
if time_space_end.unit in [TimeUnit.FRAME, TimeUnit.SECOND]:
if video_process is not None:
option = {}
if time_padding_end.unit == TimeUnit.FRAME:
if time_space_end.unit == TimeUnit.FRAME:
option = {
"stop": time_padding_end.value,
"stop": time_space_end.value,
}
elif time_padding_end.unit == TimeUnit.SECOND:
elif time_space_end.unit == TimeUnit.SECOND:
option = {
"stop_duration": time_padding_end.value,
"stop_duration": time_space_end.value,
}
video_process = ffmpeg.filter(
video_process, "tpad", color=background_color_code, **option
)
if audio_process is not None:
pad_dur = time_padding_end.get_second(VSMLManager.get_root_fps())
pad_dur = time_space_end.get_second()
audio_process = ffmpeg.filter(
audio_process, "apad", pad_dur=pad_dur
)
return video_process, audio_process


def concat_filter(
base_process: Optional[Any], merging_process: Any, is_video: bool = True
) -> Any:
if base_process is None:
return merging_process
else:
return ffmpeg.concat(
base_process, merging_process, v=int(is_video), a=int(not is_video)
)


def audio_merge_filter(
base_audio_process: Optional[Any],
merging_audio_process: Any,
) -> Any:
if base_audio_process is None:
return merging_audio_process
else:
return ffmpeg.filter(
[base_audio_process, merging_audio_process],
"amix",
normalize=False,
)


def adjust_parallel_audio(
object_length: TimeValue,
audio_process: Any,
) -> Any:
option = {}
if object_length.unit == TimeUnit.FIT:
option = {"whole_len": -1.0}
else:
option = {"whole_dur": object_length.get_second()}

return ffmpeg.filter(
audio_process,
"apad",
**option,
)


def adjust_fit_sequence(
background_color_code: str, video_process: Any, audio_process: Any
) -> tuple[Any, Any]:
if video_process is not None:
video_process = ffmpeg.filter(
video_process, "tpad", stop=-1, color=background_color_code
)
if audio_process is not None:
audio_process = ffmpeg.filter(audio_process, "apad", pad_len=-1)
return video_process, audio_process


def set_background_filter(
width: GraphicValue,
height: GraphicValue,
background_color: Optional[Color],
video_process: Optional[Any] = None,
fit_video_process: bool = False,
position_x: Optional[GraphicValue] = None,
position_y: Optional[GraphicValue] = None,
) -> Any:
background_process = get_background_process(
"{}x{}".format(
width.get_pixel(),
height.get_pixel(),
),
background_color,
)
return layering_filter(
background_process,
video_process,
position_x,
position_y,
fit_video_process,
)


def layering_filter(
base_video_process: Optional[Any],
merging_video_process: Any,
position_x: Optional[GraphicValue] = None,
position_y: Optional[GraphicValue] = None,
fit_shorter: bool = False,
) -> Any:
option = {}
if position_x is not None:
option |= {"x": position_x.get_pixel()}
if position_y is not None:
option |= {"y": position_y.get_pixel()}

if base_video_process is None:
return merging_video_process
elif merging_video_process is None:
return base_video_process
else:
return ffmpeg.overlay(
base_video_process,
merging_video_process,
eof_action="pass",
shortest=fit_shorter,
**option,
)
14 changes: 7 additions & 7 deletions src/converter/wrap/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from converter.schemas import Process
from converter.style_to_filter import (
get_background_color_code,
time_padding_end_filter,
time_padding_start_filter,
time_space_end_filter,
time_space_start_filter,
)
from style import Order, TimeUnit

Expand All @@ -12,32 +12,32 @@


def create_wrap_process(
processes: list[Process],
child_processes: list[Process],
vsml_content: WrapContent,
debug_mode: bool = False,
) -> Process:
match vsml_content.style.order:
case Order.SEQUENCE:
process = create_sequence_process(
processes, vsml_content, debug_mode
child_processes, vsml_content, debug_mode
)
case Order.PARALLEL:
process = create_parallel_process(
processes, vsml_content, debug_mode
child_processes, vsml_content, debug_mode
)
case _:
raise Exception()

style = vsml_content.style
background_color_code = get_background_color_code(style.background_color)
process.video, process.audio = time_padding_start_filter(
process.video, process.audio = time_space_start_filter(
style.time_padding_start,
background_color_code,
process.video,
process.audio,
)
if style.object_length.unit in [TimeUnit.FRAME, TimeUnit.SECOND]:
process.video, process.audio = time_padding_end_filter(
process.video, process.audio = time_space_end_filter(
style.time_padding_end,
background_color_code,
process.video,
Expand Down
Loading

0 comments on commit 78b4ae1

Please sign in to comment.