Skip to content

Commit

Permalink
implementing reading using one shared memory block (#6)
Browse files Browse the repository at this point in the history
* implementing reading using one shared memory block

* rename file

* update

* update to strictly better setup

* working with benchmarks

* add name returning for video writing

* get youtube videos working like in main

* small fix + fix tests

* fix lint

* fix docstring

* update examples

* update README

Co-authored-by: iejmac <iejmac@compute-od-gpu-dy-p4d-24xlarge-194.hpc-1click-production2.pcluster>
  • Loading branch information
iejMac and iejmac authored Jul 17, 2022
1 parent 31b6f38 commit 976cef3
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 353 deletions.
39 changes: 20 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@ SYNOPSIS
DESCRIPTION
Input:
src:
src:
str: path to mp4 file
str: youtube link
str: path to txt file with multiple mp4's or youtube links
list: list with multiple mp4's or youtube links
dest:
dest:
str: directory where to save frames to
None: dest = src + .npy
take_every_nth:
take_every_nth:
int: only take every nth frame
resize_size:
resize_size:
int: new pixel height and width of resized frame
workers:
int: number of workers used to read videos
memory_size:
int: number of GB of shared memory used for reading, use larger shared memory for more videos
POSITIONAL ARGUMENTS
SRC
Expand All @@ -49,6 +53,13 @@ FLAGS
Default: 1
--resize_size=RESIZE_SIZE
Default: 224
--workers=WORKERS
Default: 1
--memory_size=MEMORY_SIZE
Default: 4
NOTES
You can also use flags syntax for POSITIONAL ARGUMENTS
```

## API
Expand All @@ -65,31 +76,21 @@ take_every_5 = 5
video2numpy(VIDS, FRAME_DIR, take_every_5)
```

You can alse directly use the reader and iterate over frame blocks yourself:
You can also directly use the reader and iterate over videos yourself:
```python
import glob
from video2numpy.frame_reader import FrameReader
from video2numpy.utils import split_block

VIDS = glob.glob("some/path/my_videos/*.mp4")
take_every_5 = 5
resize_size = 300

reader = FrameReader(VIDS, take_every_5, resize_size)
reader.start_reading()

for block, ind_dict in reader:

if you need to process the block in large batches (f.e. good for ML):
proc_block = ml_model(block)
else:
proc_block = block

# then you can separate the video frames into a dict easily with split_block from utils:
split_up_vids = split_block(proc_block, ind_dict)

for vid_name, proc_frames in split_up_vids.items():
# do something with proc_frame of shape (n_frames, 300, 300, 3)
...
for vid_frames, npy_name in reader:
# do something with vid_frames of shape (n_frames, 300, 300, 3)
...
```

## For development
Expand Down
31 changes: 15 additions & 16 deletions benchmark/reader_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,34 @@ def parse_args():
default="default", # TODO: maybe find nice way of getting reading type (cv2)
help="For unique output graph file name",
)
parser.add_argument("--chunk_size", type=int, default=50, help="How many videos to try to read at once")
parser.add_argument("--resize_size", type=int, default=224, help="Resize frames to resize_size x resize_size")
parser.add_argument(
"--thread_count",
"--workers",
type=int,
default=6, # TODO: find way of getting default automatically
help="How many threads to use for video chunk reading",
help="How many workers to use for video reading",
)
args = parser.parse_args()
return args


def benchmark_reading(vids, chunk_size, take_en, resize_size, thread_count):
def benchmark_reading(vids, take_en, resize_size, workers):
reader = FrameReader(
vids,
chunk_size=chunk_size,
take_every_nth=take_en,
resize_size=resize_size,
thread_count=thread_count,
auto_release=True,
workers=workers,
memory_size=4,
)
reader.start_reading()

t0 = time.perf_counter()

count = 0
for block, ind_dict in reader:
block[0, 0, 0, 0] # assert no Segmentation fault
count += block.shape[0]
for vid, name in reader:
print(name)
vid[0, 0, 0, 0] # assert no Segmentation fault
count += vid.shape[0]

read_time = time.perf_counter() - t0
samp_per_s = count / read_time
Expand All @@ -63,20 +62,20 @@ def benchmark_reading(vids, chunk_size, take_en, resize_size, thread_count):
print(f"Benchmarking {args.name} on {len(vids)} videos...")

video_fps = [1, 3, 5, 10, 25] # tested variable
chunk_size = args.chunk_size
resize_size = args.resize_size
thread_count = args.thread_count
workers = args.workers

print(f"Chunk size - {chunk_size} | Resize size - {resize_size} | Thread count - {thread_count}")
print(f"Resize size - {resize_size} | Workers - {workers}")

results = []
for fps in video_fps:
ten = int(CONST_VID_FPS / fps)
samp_per_s, _, _ = benchmark_reading(vids, chunk_size, ten, resize_size, thread_count)
samp_per_s, _, _ = benchmark_reading(vids, ten, resize_size, workers)
print(f"samples/s @ {fps} FPS = {samp_per_s}")
results.append(samp_per_s)

plt.plot(video_fps, results)
plt.title(f"{args.name}: chunk size - {chunk_size} | resize size - {resize_size} | threads - {thread_count}")
plt.title(f"{args.name}: resize size - {resize_size} | workers - {workers}")
plt.xlabel("Target video FPS")
plt.ylabel("Reading speed samples/s")
plt.savefig(f"eff_{args.name}_{chunk_size}_{resize_size}_{thread_count}.png")
plt.savefig(f"eff_{args.name}_{workers}_{resize_size}_{workers}.png")
26 changes: 0 additions & 26 deletions examples/manual_release.py

This file was deleted.

23 changes: 8 additions & 15 deletions examples/youtube.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import cv2

from video2numpy.frame_reader import FrameReader
from video2numpy.utils import split_block

if __name__ == "__main__":

Expand All @@ -14,26 +13,20 @@
"https://www.youtube.com/watch?v=pdyRT_BXfXE",
]

chunk_size = 2 # two video per thread
take_every_nth = 25 # take every 25th frame
resize_size = 224 # make frames 224x224

reader = FrameReader(links, chunk_size, take_every_nth, resize_size)
reader = FrameReader(links, take_every_nth, resize_size)
reader.start_reading()

for block, ind_dict in reader:

vid_frames = split_block(block, ind_dict)

for frames, vidID in reader:
# Play few frames from each video
for vidID, frames in vid_frames.items():
print(f"Playing video {vidID}...")

for frame in frames:
cv2.imshow("frame", frame)
print(f"Playing video {vidID}...")
for frame in frames:
cv2.imshow("frame", frame)

key = cv2.waitKey(50)
if key == ord("q"):
break
key = cv2.waitKey(50)
if key == ord("q"):
break

cv2.destroyAllWindows()
18 changes: 4 additions & 14 deletions tests/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,15 @@
def test_reader():
vids = glob.glob("tests/test_videos/*.mp4")

vid_chunk_size = 1
take_every_nth = 1
resize_size = 150

reader = FrameReader(vids, vid_chunk_size, take_every_nth, resize_size)

reader = FrameReader(vids, take_every_nth, resize_size)
reader.start_reading()

for block, ind_dict in reader:
for dst_name, inds in ind_dict.items():
i0, it = inds
frames = block[i0:it]

mp4_name = dst_name[:-4] + ".mp4"

assert it - i0 == FRAME_COUNTS[mp4_name]
assert block.shape[0] == it - i0
assert block.shape[1:] == (150, 150, 3)
assert len(reader.shms) == 0
for vid_frames, dst_name in reader:
mp4_name = dst_name[:-4] + ".mp4"
assert vid_frames.shape[0] == FRAME_COUNTS[mp4_name]


def test_resizer():
Expand Down
89 changes: 31 additions & 58 deletions video2numpy/frame_reader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""reader - uses a reader function to read frames from videos"""
import numpy as np

from multiprocessing import shared_memory, SimpleQueue, Process
import multiprocessing

from .read_vids_cv2 import read_vids
from .shared_queue import SharedQueue


class FrameReader:
Expand All @@ -13,81 +12,55 @@ class FrameReader:

def __init__(
self,
fnames,
chunk_size=1,
vids,
take_every_nth=1,
resize_size=224,
thread_count=8,
auto_release=True,
workers=1,
memory_size=4,
):
"""
Input:
fnames - list with youtube links or paths to mp4 files.
vids - list with youtube links or paths to mp4 files.
chunk_size - how many videos to process at once.
take_every_nth - offset between frames we take.
resize_size - pixel height and width of target output shape.
thread_count - number of threads to distribute video chunk reading to.
auto_release - FrameReader iterator automatically releases shm buffers in next
iteration. This means the returned frame block or any slices
of it won't work in iterations following the one where it was returned.
If you plan on using it out of the iteration set this to False and
remember to manually deallocate it by calling release_memory once you're done.
workers - number of Processes to distribute video reading to.
memory_size - number of GB of shared_memory
"""
self.auto_release = auto_release
self.info_q = SimpleQueue()

read_args = (
fnames,
self.info_q,
chunk_size,
take_every_nth,
resize_size,
thread_count,
)
self.read_proc = Process(target=read_vids, args=read_args)
memory_size_b = memory_size * 1024**3
shared_frames = memory_size_b // (256**2 * 3)
self.shared_queue = SharedQueue.from_shape([shared_frames, resize_size, resize_size, 3])

self.empty = False
self.shms = []
div_vids = [vids[int(len(vids) * i / workers) : int(len(vids) * (i + 1) / workers)] for i in range(workers)]
self.procs = [
multiprocessing.Process(
args=(work, worker_id, take_every_nth, resize_size, self.shared_queue.export()),
daemon=True,
target=read_vids,
)
for worker_id, work in enumerate(div_vids)
]

def __iter__(self):
return self

def __next__(self):
if not self.empty:
info = self.info_q.get()

if isinstance(info, str):
self.finish_reading()
raise StopIteration

if self.auto_release and len(self.shms) > 0:
last_shm = self.shms.pop(0)
last_shm.close()
last_shm.unlink()

shm = shared_memory.SharedMemory(name=info["shm_name"])
block = np.ndarray(info["full_shape"], dtype=np.uint8, buffer=shm.buf)

self.shms.append(shm) # close and unlink when done using blocks

return block, info["ind_dict"]
if self.shared_queue or any(p.is_alive() for p in self.procs):
frames, name = self.shared_queue.get()
return frames, name
self.finish_reading()
self.release_memory()
raise StopIteration

def start_reading(self):
self.empty = False
self.read_proc.start()
for p in self.procs:
p.start()

def finish_reading(self):
self.empty = True
self.read_proc.join()
if self.auto_release:
self.release_memory()
for p in self.procs:
p.join()

def release_memory(self):
for shm in self.shms:
shm.close()
try:
shm.unlink()
except FileNotFoundError:
print(f"Warning: Tried unlinking shared_memory block '{shm.name}' but file wasn't found")
self.shms = []
self.shared_queue.frame_mem.unlink()
self.shared_queue.frame_mem.close()
Loading

0 comments on commit 976cef3

Please sign in to comment.