diff --git a/README.md b/README.md index f287314..3317a37 100644 --- a/README.md +++ b/README.md @@ -26,18 +26,22 @@ SYNOPSIS DESCRIPTION Input: - src: + src: str: path to mp4 file str: youtube link str: path to txt file with multiple mp4's or youtube links list: list with multiple mp4's or youtube links - dest: + dest: str: directory where to save frames to None: dest = src + .npy - take_every_nth: + take_every_nth: int: only take every nth frame - resize_size: + resize_size: int: new pixel height and width of resized frame + workers: + int: number of workers used to read videos + memory_size: + int: number of GB of shared memory used for reading, use larger shared memory for more videos POSITIONAL ARGUMENTS SRC @@ -49,6 +53,13 @@ FLAGS Default: 1 --resize_size=RESIZE_SIZE Default: 224 + --workers=WORKERS + Default: 1 + --memory_size=MEMORY_SIZE + Default: 4 + +NOTES + You can also use flags syntax for POSITIONAL ARGUMENTS ``` ## API @@ -65,31 +76,21 @@ take_every_5 = 5 video2numpy(VIDS, FRAME_DIR, take_every_5) ``` -You can alse directly use the reader and iterate over frame blocks yourself: +You can also directly use the reader and iterate over videos yourself: ```python import glob from video2numpy.frame_reader import FrameReader -from video2numpy.utils import split_block VIDS = glob.glob("some/path/my_videos/*.mp4") take_every_5 = 5 resize_size = 300 reader = FrameReader(VIDS, take_every_5, resize_size) +reader.start_reading() -for block, ind_dict in reader: - - if you need to process the block in large batches (f.e. good for ML): - proc_block = ml_model(block) - else: - proc_block = block - - # then you can separate the video frames into a dict easily with split_block from utils: - split_up_vids = split_block(proc_block, ind_dict) - - for vid_name, proc_frames in split_up_vids.items(): - # do something with proc_frame of shape (n_frames, 300, 300, 3) - ... +for vid_frames, npy_name in reader: + # do something with vid_frames of shape (n_frames, 300, 300, 3) + ... ``` ## For development diff --git a/benchmark/reader_benchmark.py b/benchmark/reader_benchmark.py index b91b2ba..b700aa3 100644 --- a/benchmark/reader_benchmark.py +++ b/benchmark/reader_benchmark.py @@ -20,35 +20,34 @@ def parse_args(): default="default", # TODO: maybe find nice way of getting reading type (cv2) help="For unique output graph file name", ) - parser.add_argument("--chunk_size", type=int, default=50, help="How many videos to try to read at once") parser.add_argument("--resize_size", type=int, default=224, help="Resize frames to resize_size x resize_size") parser.add_argument( - "--thread_count", + "--workers", type=int, default=6, # TODO: find way of getting default automatically - help="How many threads to use for video chunk reading", + help="How many workers to use for video reading", ) args = parser.parse_args() return args -def benchmark_reading(vids, chunk_size, take_en, resize_size, thread_count): +def benchmark_reading(vids, take_en, resize_size, workers): reader = FrameReader( vids, - chunk_size=chunk_size, take_every_nth=take_en, resize_size=resize_size, - thread_count=thread_count, - auto_release=True, + workers=workers, + memory_size=4, ) reader.start_reading() t0 = time.perf_counter() count = 0 - for block, ind_dict in reader: - block[0, 0, 0, 0] # assert no Segmentation fault - count += block.shape[0] + for vid, name in reader: + print(name) + vid[0, 0, 0, 0] # assert no Segmentation fault + count += vid.shape[0] read_time = time.perf_counter() - t0 samp_per_s = count / read_time @@ -63,20 +62,20 @@ def benchmark_reading(vids, chunk_size, take_en, resize_size, thread_count): print(f"Benchmarking {args.name} on {len(vids)} videos...") video_fps = [1, 3, 5, 10, 25] # tested variable - chunk_size = args.chunk_size resize_size = args.resize_size - thread_count = args.thread_count + workers = args.workers - print(f"Chunk size - {chunk_size} | Resize size - {resize_size} | Thread count - {thread_count}") + print(f"Resize size - {resize_size} | Workers - {workers}") results = [] for fps in video_fps: ten = int(CONST_VID_FPS / fps) - samp_per_s, _, _ = benchmark_reading(vids, chunk_size, ten, resize_size, thread_count) + samp_per_s, _, _ = benchmark_reading(vids, ten, resize_size, workers) + print(f"samples/s @ {fps} FPS = {samp_per_s}") results.append(samp_per_s) plt.plot(video_fps, results) - plt.title(f"{args.name}: chunk size - {chunk_size} | resize size - {resize_size} | threads - {thread_count}") + plt.title(f"{args.name}: resize size - {resize_size} | workers - {workers}") plt.xlabel("Target video FPS") plt.ylabel("Reading speed samples/s") - plt.savefig(f"eff_{args.name}_{chunk_size}_{resize_size}_{thread_count}.png") + plt.savefig(f"eff_{args.name}_{workers}_{resize_size}_{workers}.png") diff --git a/examples/manual_release.py b/examples/manual_release.py deleted file mode 100644 index e351731..0000000 --- a/examples/manual_release.py +++ /dev/null @@ -1,26 +0,0 @@ -import glob - -from video2numpy.frame_reader import FrameReader - - -if __name__ == "__main__": - vids = glob.glob("tests/test_videos/*.mp4") # use test videos for demo - - chunk_size = 1 - take_every_nth = 10 - resize_size = 100 - - # intialize reader with auto_release=False - reader = FrameReader(vids, chunk_size, take_every_nth, resize_size, auto_release=False) - reader.start_reading() - - blocks = [] - for block, ind_dict in reader: - blocks.append(block) - - # will throw Segmentation fault if auto_relase=True (try it) - for block in blocks: - print(block[0, 0, 0, 0]) # 5, 63 - - # Remember to release memory - reader.release_memory() diff --git a/examples/youtube.py b/examples/youtube.py index fbffe6a..f27d75d 100644 --- a/examples/youtube.py +++ b/examples/youtube.py @@ -1,7 +1,6 @@ import cv2 from video2numpy.frame_reader import FrameReader -from video2numpy.utils import split_block if __name__ == "__main__": @@ -14,26 +13,20 @@ "https://www.youtube.com/watch?v=pdyRT_BXfXE", ] - chunk_size = 2 # two video per thread take_every_nth = 25 # take every 25th frame resize_size = 224 # make frames 224x224 - reader = FrameReader(links, chunk_size, take_every_nth, resize_size) + reader = FrameReader(links, take_every_nth, resize_size) reader.start_reading() - for block, ind_dict in reader: - - vid_frames = split_block(block, ind_dict) - + for frames, vidID in reader: # Play few frames from each video - for vidID, frames in vid_frames.items(): - print(f"Playing video {vidID}...") - - for frame in frames: - cv2.imshow("frame", frame) + print(f"Playing video {vidID}...") + for frame in frames: + cv2.imshow("frame", frame) - key = cv2.waitKey(50) - if key == ord("q"): - break + key = cv2.waitKey(50) + if key == ord("q"): + break cv2.destroyAllWindows() diff --git a/tests/test_modules.py b/tests/test_modules.py index eca93e1..eb4be48 100644 --- a/tests/test_modules.py +++ b/tests/test_modules.py @@ -15,25 +15,15 @@ def test_reader(): vids = glob.glob("tests/test_videos/*.mp4") - vid_chunk_size = 1 take_every_nth = 1 resize_size = 150 - reader = FrameReader(vids, vid_chunk_size, take_every_nth, resize_size) - + reader = FrameReader(vids, take_every_nth, resize_size) reader.start_reading() - for block, ind_dict in reader: - for dst_name, inds in ind_dict.items(): - i0, it = inds - frames = block[i0:it] - - mp4_name = dst_name[:-4] + ".mp4" - - assert it - i0 == FRAME_COUNTS[mp4_name] - assert block.shape[0] == it - i0 - assert block.shape[1:] == (150, 150, 3) - assert len(reader.shms) == 0 + for vid_frames, dst_name in reader: + mp4_name = dst_name[:-4] + ".mp4" + assert vid_frames.shape[0] == FRAME_COUNTS[mp4_name] def test_resizer(): diff --git a/video2numpy/frame_reader.py b/video2numpy/frame_reader.py index d428cbe..eed28a9 100644 --- a/video2numpy/frame_reader.py +++ b/video2numpy/frame_reader.py @@ -1,9 +1,8 @@ """reader - uses a reader function to read frames from videos""" -import numpy as np - -from multiprocessing import shared_memory, SimpleQueue, Process +import multiprocessing from .read_vids_cv2 import read_vids +from .shared_queue import SharedQueue class FrameReader: @@ -13,81 +12,55 @@ class FrameReader: def __init__( self, - fnames, - chunk_size=1, + vids, take_every_nth=1, resize_size=224, - thread_count=8, - auto_release=True, + workers=1, + memory_size=4, ): """ Input: - fnames - list with youtube links or paths to mp4 files. + vids - list with youtube links or paths to mp4 files. chunk_size - how many videos to process at once. take_every_nth - offset between frames we take. resize_size - pixel height and width of target output shape. - thread_count - number of threads to distribute video chunk reading to. - auto_release - FrameReader iterator automatically releases shm buffers in next - iteration. This means the returned frame block or any slices - of it won't work in iterations following the one where it was returned. - If you plan on using it out of the iteration set this to False and - remember to manually deallocate it by calling release_memory once you're done. + workers - number of Processes to distribute video reading to. + memory_size - number of GB of shared_memory """ - self.auto_release = auto_release - self.info_q = SimpleQueue() - read_args = ( - fnames, - self.info_q, - chunk_size, - take_every_nth, - resize_size, - thread_count, - ) - self.read_proc = Process(target=read_vids, args=read_args) + memory_size_b = memory_size * 1024**3 + shared_frames = memory_size_b // (256**2 * 3) + self.shared_queue = SharedQueue.from_shape([shared_frames, resize_size, resize_size, 3]) - self.empty = False - self.shms = [] + div_vids = [vids[int(len(vids) * i / workers) : int(len(vids) * (i + 1) / workers)] for i in range(workers)] + self.procs = [ + multiprocessing.Process( + args=(work, worker_id, take_every_nth, resize_size, self.shared_queue.export()), + daemon=True, + target=read_vids, + ) + for worker_id, work in enumerate(div_vids) + ] def __iter__(self): return self def __next__(self): - if not self.empty: - info = self.info_q.get() - - if isinstance(info, str): - self.finish_reading() - raise StopIteration - - if self.auto_release and len(self.shms) > 0: - last_shm = self.shms.pop(0) - last_shm.close() - last_shm.unlink() - - shm = shared_memory.SharedMemory(name=info["shm_name"]) - block = np.ndarray(info["full_shape"], dtype=np.uint8, buffer=shm.buf) - - self.shms.append(shm) # close and unlink when done using blocks - - return block, info["ind_dict"] + if self.shared_queue or any(p.is_alive() for p in self.procs): + frames, name = self.shared_queue.get() + return frames, name + self.finish_reading() + self.release_memory() raise StopIteration def start_reading(self): - self.empty = False - self.read_proc.start() + for p in self.procs: + p.start() def finish_reading(self): - self.empty = True - self.read_proc.join() - if self.auto_release: - self.release_memory() + for p in self.procs: + p.join() def release_memory(self): - for shm in self.shms: - shm.close() - try: - shm.unlink() - except FileNotFoundError: - print(f"Warning: Tried unlinking shared_memory block '{shm.name}' but file wasn't found") - self.shms = [] + self.shared_queue.frame_mem.unlink() + self.shared_queue.frame_mem.close() diff --git a/video2numpy/read_vids_cv2.py b/video2numpy/read_vids_cv2.py index eeeafad..409d1a4 100644 --- a/video2numpy/read_vids_cv2.py +++ b/video2numpy/read_vids_cv2.py @@ -1,110 +1,55 @@ -"""uses cv2 to read frames from video.""" +"""uses opencv to read frames from video.""" import cv2 +import random import numpy as np -import youtube_dl - -from multiprocessing import shared_memory -from multiprocessing.pool import ThreadPool from .resizer import Resizer +from .shared_queue import SharedQueue +from .utils import handle_youtube -QUALITY = "360p" - - -def read_vids(vids, queue, chunk_size=1, take_every_nth=1, resize_size=224, thread_count=8): +def read_vids(vids, worker_id, take_every_nth, resize_size, queue_export): """ - Reads list of videos, saves frames to /dev/shm, and passes reading info through - multiprocessing queue + Reads list of videos, saves frames to Shared Queue Input: vids - list of videos (either path or youtube link) - queue - multiprocessing queue used to pass frame block information - chunk_size - size of chunk of videos to take for parallel reading + worker_id - unique ID of worker take_every_nth - offset between frames of video (to lower FPS) resize_size - new pixel height and width of resized frame - thread_count - number of threads to distribute video chunk reading to + queue_export - SharedQueue export used re-create SharedQueue object in worker """ - - while len(vids) > 0: - vid_chunk = vids[:chunk_size] - vids = vids[chunk_size:] - - frams = {} - - with ThreadPool(thread_count) as pool: - - def generate_frames(vid): - - video_frames = [] - - if not vid.endswith(".mp4"): # youtube link - ydl_opts = {} - ydl = youtube_dl.YoutubeDL(ydl_opts) - info = ydl.extract_info(vid, download=False) - formats = info.get("formats", None) - f = None - for f in formats: - if f.get("format_note", None) != QUALITY: - continue - break - - cv2_vid = f.get("url", None) - - dst_name = info.get("id") + ".npy" - else: - cv2_vid = vid - dst_name = vid[:-4].split("/")[-1] + ".npy" - - cap = cv2.VideoCapture(cv2_vid) # pylint: disable=I1101 - - if not cap.isOpened(): - print(f"Error: {vid} not opened") - return - - width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) - height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) - frame_shape = [height, width, 3] - - resizer = Resizer(frame_shape, resize_size) - - ret = True - ind = 0 - while ret: - ret, frame = cap.read() - if ret and (ind % take_every_nth == 0): - frame = resizer(frame) - video_frames.append(frame) - ind += 1 - - frams[dst_name] = video_frames - - for _ in pool.imap_unordered(generate_frames, vid_chunk): - pass - - ind_dict = {} - frame_count = 0 - for k, v in frams.items(): - ind_dict[k] = (frame_count, frame_count + len(v)) - frame_count += len(v) - - full_shape = (frame_count, resize_size, resize_size, 3) - - mem_size = frame_count * full_shape[0] * full_shape[1] * full_shape[2] - shm = shared_memory.SharedMemory(create=True, size=mem_size) - - in_arr = np.ndarray(full_shape, dtype=np.uint8, buffer=shm.buf) - for k, v in frams.items(): - i0, it = ind_dict[k] - if it > i0: - in_arr[i0:it] = v - - info = { - "ind_dict": ind_dict, - "shm_name": shm.name, - "full_shape": full_shape, - } - shm.close() - queue.put(info) - - queue.put("DONE_READING") + queue = SharedQueue.from_export(*queue_export) + + def get_frames(vid): + if not vid.endswith(".mp4"): + load_vid, dst_name = handle_youtube(vid) + else: + load_vid, dst_name = vid, vid[:-4].split("/")[-1] + ".npy" + + video_frames = [] + cap = cv2.VideoCapture(load_vid) # pylint: disable=I1101 + + if not cap.isOpened(): + print(f"Error: {vid} not opened") + return + + width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) + height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) + frame_shape = [height, width, 3] + + resizer = Resizer(frame_shape, resize_size) + + ret = True + ind = 0 + while ret: + ret, frame = cap.read() + if ret and (ind % take_every_nth == 0): + frame = resizer(frame) + video_frames.append(frame) + ind += 1 + queue.put(np.array(video_frames), dst_name) + + random.Random(worker_id).shuffle(vids) + for vid in vids: + get_frames(vid) diff --git a/video2numpy/read_vids_ffmpeg.py b/video2numpy/read_vids_ffmpeg.py index f5aa6ce..c73c4ca 100644 --- a/video2numpy/read_vids_ffmpeg.py +++ b/video2numpy/read_vids_ffmpeg.py @@ -1,92 +1,58 @@ """uses ffmpeg to read frames from video.""" import cv2 +import random import ffmpeg import numpy as np -from multiprocessing import shared_memory -from multiprocessing.pool import ThreadPool +from .shared_queue import SharedQueue +from .utils import handle_youtube QUALITY = "360p" -def read_vids(vids, queue, chunk_size=1, take_every_nth=1, resize_size=224, thread_count=8): +def read_vids(vids, worker_id, take_every_nth, resize_size, queue_export): """ - Reads list of videos, saves frames to /dev/shm, and passes reading info through - multiprocessing queue + Reads list of videos, saves frames to SharedQueue Input: vids - list of videos (either path or youtube link) - queue - multiprocessing queue used to pass frame block information - chunk_size - size of chunk of videos to take for parallel reading + worker_id - unique ID of worker take_every_nth - offset between frames of video (to lower FPS) resize_size - new pixel height and width of resized frame - thread_count - number of threads to distribute video chunk reading to + queue_export - SharedQueue export used re-create SharedQueue object in worker """ - - while len(vids) > 0: - vid_chunk = vids[:chunk_size] - vids = vids[chunk_size:] - - frams = {} - - fps = int(25 / take_every_nth) - - with ThreadPool(thread_count) as pool: - - def get_frames(vid): - - cap = cv2.VideoCapture(vid) - width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) - height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) - nw, nh = (-1, 224) if width > height else (224, -1) - - dst_name = vid[:-4].split("/")[-1] + ".npy" - - try: - out, _ = ( - ffmpeg.input(vid) - .filter("fps", fps=fps) - .filter("scale", nw, nh) - .filter("crop", w=224, h=224) - .output("pipe:", format="rawvideo", pix_fmt="rgb24", loglevel="error") - .run(capture_stdout=True) - ) - except ffmpeg._run.Error: # pylint: disable=protected-access - print(f"Error: couldn't read video {vid}") - return - - frame_count = int(len(out) / (224 * 224 * 3)) # can do this since dtype = np.uint8 (byte) - - vid = np.frombuffer(out, np.uint8).reshape((frame_count, 224, 224, 3)) - frams[dst_name] = vid - - for _ in pool.imap_unordered(get_frames, vid_chunk): - pass - - ind_dict = {} - frame_count = 0 - for k, v in frams.items(): - ind_dict[k] = (frame_count, frame_count + len(v)) - frame_count += len(v) - - full_shape = (frame_count, resize_size, resize_size, 3) - - mem_size = frame_count * full_shape[0] * full_shape[1] * full_shape[2] - shm = shared_memory.SharedMemory(create=True, size=mem_size) - - in_arr = np.ndarray(full_shape, dtype=np.uint8, buffer=shm.buf) - for k, v in frams.items(): - i0, it = ind_dict[k] - if it > i0: - in_arr[i0:it] = v - - info = { - "ind_dict": ind_dict, - "shm_name": shm.name, - "full_shape": full_shape, - } - shm.close() - queue.put(info) - - queue.put("DONE_READING") + queue = SharedQueue.from_export(*queue_export) + fps = int(25 / take_every_nth) + + def get_frames(vid): + if not vid.endswith(".mp4"): + load_vid, dst_name = handle_youtube(vid) + else: + load_vid, dst_name = vid, vid[:-4].split("/")[-1] + ".npy" + + cap = cv2.VideoCapture(load_vid) + width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) + height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) + nw, nh = (-1, resize_size) if width > height else (resize_size, -1) + + try: + out, _ = ( + ffmpeg.input(load_vid) + .filter("fps", fps=fps) + .filter("scale", nw, nh) + .filter("crop", w=resize_size, h=resize_size) + .output("pipe:", format="rawvideo", pix_fmt="rgb24", loglevel="error") + .run(capture_stdout=True) + ) + except ffmpeg._run.Error: # pylint: disable=protected-access + print(f"Error: couldn't read video {vid}") + return + + frame_count = int(len(out) / (resize_size * resize_size * 3)) # can do this since dtype = np.uint8 (byte) + vid_frames = np.frombuffer(out, np.uint8).reshape((frame_count, resize_size, resize_size, 3)) + queue.put(vid_frames, dst_name) + + random.Random(worker_id).shuffle(vids) + for vid in vids: + get_frames(vid) diff --git a/video2numpy/shared_queue.py b/video2numpy/shared_queue.py new file mode 100644 index 0000000..a4f8f52 --- /dev/null +++ b/video2numpy/shared_queue.py @@ -0,0 +1,106 @@ +"""SharedQueue for saving arrays of video frames.""" +import datetime +import time +import multiprocessing +import numpy as np +import threading +import typing + +from multiprocessing.shared_memory import SharedMemory + + +# taken from: https://github.com/ClashLuke +class SharedQueue: + """SharedQueue class.""" + + frame_mem: SharedMemory + frame: np.ndarray + indices: list + write_index_lock: threading.Lock + read_index_lock: threading.Lock + + @classmethod + def from_shape(cls, shape: typing.List[int]): + """create SharedQueue from shape.""" + self = cls() + frames = np.zeros(shape, dtype=np.uint8) + self.frame_mem = SharedMemory(create=True, size=frames.nbytes) + self.frame = np.ndarray(shape, dtype=np.uint8, buffer=self.frame_mem.buf) + manager = multiprocessing.Manager() + self.indices = manager.list() + self.frame[:] = 0 + self.write_index_lock = manager.Lock() + self.read_index_lock = manager.Lock() + return self + + @classmethod + def from_export(cls, frame_name, frame_shape, indices, write_index_lock, read_index_lock): + self = cls() + self.frame_mem = SharedMemory(create=False, name=frame_name) + self.frame = np.ndarray(frame_shape, dtype=np.uint8, buffer=self.frame_mem.buf) + self.indices = indices + self.write_index_lock = write_index_lock + self.read_index_lock = read_index_lock + return self + + def export(self): + return self.frame_mem.name, self.frame.shape, self.indices, self.write_index_lock, self.read_index_lock + + def get(self): + while True: + with self.read_index_lock: + while not self: + time.sleep(1) + name, start, end = self.indices.pop(0) + return self.frame[start:end].copy(), name # local clone, so share can be safely edited + + def _free_memory(self, size: int) -> typing.Optional[typing.Tuple[int, int, int]]: + if not self: + return 0, 0, size + local_indices = list(self.indices) + itr = zip([["", None, 0]] + local_indices, local_indices + [["", self.frame.shape[0], None]]) + for i, ((_, _, prev_end), (_, start, _)) in enumerate(itr): + if start - prev_end > size: + return i, prev_end, prev_end + size # type: ignore + return 0, 0, 0 + + def _put_item(self, obj: np.ndarray, name: str): + batches = obj.shape[0] + with self.write_index_lock: + indices = self._free_memory(batches) + if indices is None: + return + idx, start, end = indices + self.indices.insert(idx, (name, start, end)) + self.frame[start:end] = obj[:] # we simply assume that the synchronisation overheads make the reader slower + + def put(self, obj: np.ndarray, name: str): + """Put array on queue.""" + batches = obj.shape[0] + max_size = self.frame.shape[0] // 4 # unrealistic that it'll fit if it takes up 25% of the memory + if batches > max_size: + for idx in range(0, batches, max_size): # ... so we slice it up and feed in many smaller videos + self.put(obj[idx : idx + max_size], name) + return + + def _fits(): + return bool(self._free_memory(batches)) + + # until new frames fit into memory + waiting = 12 + while not _fits(): + time.sleep(5) + waiting -= 1 + if not waiting: + print( + "Warning: waited for one minute for space to free up, but none found. Increase memory size to avoid " + "fragmentation or implement defragmentation. Timestamp:", + datetime.datetime.now(), + flush=True, + ) + return + + self._put_item(obj, name) + + def __bool__(self): + return bool(self.indices) diff --git a/video2numpy/utils.py b/video2numpy/utils.py index b127f62..8006f9d 100644 --- a/video2numpy/utils.py +++ b/video2numpy/utils.py @@ -1,22 +1,22 @@ """video2numpy utils""" +import youtube_dl -def split_block(block, ind_dict): - """ - separate block into individual videos using ind_dict +QUALITY = "360p" - Input: - block - numpy array returned from FrameReader or some derivative of it - with the same batch dimension order - ind_dict - dict that shows what indices correspond to what rows of the "block" arg - {"video_name": (block_ind0, block_indf) ...} - Output: - dict - {"video_name": stacked block rows corresponding to that video (usually frames) ...} - """ - sep_frames = {} - for dst_name, inds in ind_dict.items(): - i0, it = inds - vid_frames = block[i0:it] - sep_frames[dst_name] = vid_frames - return sep_frames +def handle_youtube(url): + """returns file and destination name from youtube url.""" + ydl_opts = {} + ydl = youtube_dl.YoutubeDL(ydl_opts) + info = ydl.extract_info(url, download=False) + formats = info.get("formats", None) + f = None + for f in formats: + if f.get("format_note", None) != QUALITY: + continue + break + + cv2_vid = f.get("url", None) + dst_name = info.get("id") + ".npy" + return cv2_vid, dst_name diff --git a/video2numpy/video2numpy.py b/video2numpy/video2numpy.py index 48ff97e..12dd08a 100644 --- a/video2numpy/video2numpy.py +++ b/video2numpy/video2numpy.py @@ -5,27 +5,27 @@ from .frame_reader import FrameReader -VID_CHUNK_SIZE = 2 -QUALITY = "360p" - - -def video2numpy(src, dest="", take_every_nth=1, resize_size=224): +def video2numpy(src, dest="", take_every_nth=1, resize_size=224, workers=1, memory_size=4): """ Read frames from videos and save as numpy arrays Input: - src: + src: str: path to mp4 file str: youtube link str: path to txt file with multiple mp4's or youtube links list: list with multiple mp4's or youtube links - dest: + dest: str: directory where to save frames to None: dest = src + .npy - take_every_nth: + take_every_nth: int: only take every nth frame - resize_size: + resize_size: int: new pixel height and width of resized frame + workers: + int: number of workers used to read videos + memory_size: + int: number of GB of shared memory used for reading, use larger shared memory for more videos """ if isinstance(src, str): if src.endswith(".txt"): # list of mp4s or youtube links @@ -36,12 +36,9 @@ def video2numpy(src, dest="", take_every_nth=1, resize_size=224): else: fnames = src - reader = FrameReader(fnames, VID_CHUNK_SIZE, take_every_nth, resize_size) + reader = FrameReader(fnames, take_every_nth, resize_size, workers, memory_size) reader.start_reading() - for block, ind_dict in reader: - for dst_name, inds in ind_dict.items(): - i0, it = inds - frames = block[i0:it] - save_pth = os.path.join(dest, dst_name) - np.save(save_pth, frames) + for vid_frames, dst_name in reader: + save_pth = os.path.join(dest, dst_name) + np.save(save_pth, vid_frames)