Module deepposekit.io.video
Expand source code
# -*- coding: utf-8 -*-
# Copyright 2018-2019 Jacob M. Graving <jgraving@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tensorflow.python.keras.utils import Sequence
import cv2
import numpy as np
import os
__all__ = ["VideoReader", "VideoWriter"]
class VideoReader(cv2.VideoCapture, Sequence):
"""Read a video in batches.
Parameters
----------
videopath: str
Path to the video file.
batch_size: int, default = 1
Batch size for reading frames
gray: bool, default = False
If gray, return only the middle channel
"""
def __init__(self, videopath, batch_size=1, gray=False):
if isinstance(videopath, str):
if os.path.exists(videopath):
super(VideoReader, self).__init__(videopath)
self.videopath = videopath
else:
raise ValueError("file or path does not exist")
else:
raise TypeError("videopath must be str")
self.batch_size = batch_size
self.n_frames = int(self.get(cv2.CAP_PROP_FRAME_COUNT))
self.idx = 0
self.fps = self.get(cv2.CAP_PROP_FPS)
self.height = int(self.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.width = int(self.get(cv2.CAP_PROP_FRAME_WIDTH))
self.finished = False
self.gray = gray
self._read = super(VideoReader, self).read
def read(self):
""" Read one frame
Returns
-------
frame: array
Image is returned of the frame if a frame exists.
Otherwise, return None.
"""
ret, frame = self._read()
if ret:
self.idx += 1
if self.gray:
frame = frame[..., 1][..., None]
return frame
else:
self.finished = True
return None
def read_batch(self, batch_size=1, asarray=False):
""" Read in a batch of frames.
Parameters
----------
batch_size: int, default 1
Number of frames to pull from the video.
asarray: bool, default False
If True, stack the frames (in numpy).
Returns
-------
frames: list or array
A batch of frames from the video.
"""
frames = []
for idx in range(batch_size):
frame = self.read()
if not self.finished:
frames.append(frame)
empty = len(frames) == 0
if asarray and not empty:
frames = np.stack(frames)
if not empty:
return frames
else:
return
def close(self):
""" Close the VideoReader.
Returns
-------
bool
Returns True if successfully closed.
"""
self.release()
return not self.isOpened()
def __len__(self):
return int(np.ceil(self.n_frames / float(self.batch_size)))
def __getitem__(self, index):
if self.finished:
raise StopIteration
if isinstance(index, (int, np.integer)):
idx0 = index * self.batch_size
if self.idx != idx0:
self.set(cv2.CAP_PROP_POS_FRAMES, index - 1)
self.idx = idx0
return self.read_batch(self.batch_size, True)
else:
raise NotImplementedError
def __next__(self):
if self.finished:
raise StopIteration
else:
return self.read_batch(self.batch_size, True)
def __del__(self):
self.close()
@property
def current_frame(self):
return int(self.get(cv2.CAP_PROP_POS_FRAMES))
@property
def current_time(self):
return self.get(cv2.CAP_PROP_POS_MSEC)
@property
def percent_finished(self):
return self.get(cv2.CAP_PROP_POS_AVI_RATIO) * 100
class VideoWriter:
"""Read a video in batches.
Parameters
----------
path: str
Path to save the video file.
frame_size: tuple
Size of the frame as (width, height)
codec: str, default = 'FFV1'
FourCC video codec for encoding the video.
Options include `MP4V', 'X264', 'H264', etc.
fps: int or float, default = 30.0
Frame rate for the encoded video
color: bool, default = True
Whether or not the video is color
"""
def __init__(self, path, frame_size, codec="FFV1", fps=30.0, color=True):
codec = cv2.VideoWriter_fourcc(*codec)
self.stream = cv2.VideoWriter(path, codec, fps, frame_size, color)
def write(self, frame):
self.stream.write(frame)
def write_batch(self, batch):
for frame in batch:
self.write(frame)
def close(self):
self.stream.release()
return not self.stream.isOpened()
Classes
class VideoReader (videopath, batch_size=1, gray=False)
-
Read a video in batches.
Parameters
videopath
:str
- Path to the video file.
batch_size
:int
, default =1
- Batch size for reading frames
gray
:bool
, default =False
- If gray, return only the middle channel
Expand source code
class VideoReader(cv2.VideoCapture, Sequence): """Read a video in batches. Parameters ---------- videopath: str Path to the video file. batch_size: int, default = 1 Batch size for reading frames gray: bool, default = False If gray, return only the middle channel """ def __init__(self, videopath, batch_size=1, gray=False): if isinstance(videopath, str): if os.path.exists(videopath): super(VideoReader, self).__init__(videopath) self.videopath = videopath else: raise ValueError("file or path does not exist") else: raise TypeError("videopath must be str") self.batch_size = batch_size self.n_frames = int(self.get(cv2.CAP_PROP_FRAME_COUNT)) self.idx = 0 self.fps = self.get(cv2.CAP_PROP_FPS) self.height = int(self.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.width = int(self.get(cv2.CAP_PROP_FRAME_WIDTH)) self.finished = False self.gray = gray self._read = super(VideoReader, self).read def read(self): """ Read one frame Returns ------- frame: array Image is returned of the frame if a frame exists. Otherwise, return None. """ ret, frame = self._read() if ret: self.idx += 1 if self.gray: frame = frame[..., 1][..., None] return frame else: self.finished = True return None def read_batch(self, batch_size=1, asarray=False): """ Read in a batch of frames. Parameters ---------- batch_size: int, default 1 Number of frames to pull from the video. asarray: bool, default False If True, stack the frames (in numpy). Returns ------- frames: list or array A batch of frames from the video. """ frames = [] for idx in range(batch_size): frame = self.read() if not self.finished: frames.append(frame) empty = len(frames) == 0 if asarray and not empty: frames = np.stack(frames) if not empty: return frames else: return def close(self): """ Close the VideoReader. Returns ------- bool Returns True if successfully closed. """ self.release() return not self.isOpened() def __len__(self): return int(np.ceil(self.n_frames / float(self.batch_size))) def __getitem__(self, index): if self.finished: raise StopIteration if isinstance(index, (int, np.integer)): idx0 = index * self.batch_size if self.idx != idx0: self.set(cv2.CAP_PROP_POS_FRAMES, index - 1) self.idx = idx0 return self.read_batch(self.batch_size, True) else: raise NotImplementedError def __next__(self): if self.finished: raise StopIteration else: return self.read_batch(self.batch_size, True) def __del__(self): self.close() @property def current_frame(self): return int(self.get(cv2.CAP_PROP_POS_FRAMES)) @property def current_time(self): return self.get(cv2.CAP_PROP_POS_MSEC) @property def percent_finished(self): return self.get(cv2.CAP_PROP_POS_AVI_RATIO) * 100
Ancestors
- cv2.VideoCapture
- tensorflow.python.keras.utils.data_utils.Sequence
Instance variables
var current_frame
-
Expand source code
@property def current_frame(self): return int(self.get(cv2.CAP_PROP_POS_FRAMES))
var current_time
-
Expand source code
@property def current_time(self): return self.get(cv2.CAP_PROP_POS_MSEC)
var percent_finished
-
Expand source code
@property def percent_finished(self): return self.get(cv2.CAP_PROP_POS_AVI_RATIO) * 100
Methods
def close(self)
-
Close the VideoReader.
Returns
bool
- Returns True if successfully closed.
Expand source code
def close(self): """ Close the VideoReader. Returns ------- bool Returns True if successfully closed. """ self.release() return not self.isOpened()
def read(self)
-
Read one frame
Returns
frame
:array
- Image is returned of the frame if a frame exists. Otherwise, return None.
Expand source code
def read(self): """ Read one frame Returns ------- frame: array Image is returned of the frame if a frame exists. Otherwise, return None. """ ret, frame = self._read() if ret: self.idx += 1 if self.gray: frame = frame[..., 1][..., None] return frame else: self.finished = True return None
def read_batch(self, batch_size=1, asarray=False)
-
Read in a batch of frames.
Parameters
batch_size
:int
, default1
- Number of frames to pull from the video.
asarray
:bool
, defaultFalse
- If True, stack the frames (in numpy).
Returns
frames
:list
orarray
- A batch of frames from the video.
Expand source code
def read_batch(self, batch_size=1, asarray=False): """ Read in a batch of frames. Parameters ---------- batch_size: int, default 1 Number of frames to pull from the video. asarray: bool, default False If True, stack the frames (in numpy). Returns ------- frames: list or array A batch of frames from the video. """ frames = [] for idx in range(batch_size): frame = self.read() if not self.finished: frames.append(frame) empty = len(frames) == 0 if asarray and not empty: frames = np.stack(frames) if not empty: return frames else: return
class VideoWriter (path, frame_size, codec='FFV1', fps=30.0, color=True)
-
Read a video in batches.
Parameters
path
:str
- Path to save the video file.
frame_size
:tuple
- Size of the frame as (width, height)
codec
:str
, default ='FFV1'
- FourCC video codec for encoding the video. Options include `MP4V', 'X264', 'H264', etc.
fps
:int
orfloat
, default =30.0
- Frame rate for the encoded video
color
:bool
, default =True
- Whether or not the video is color
Expand source code
class VideoWriter: """Read a video in batches. Parameters ---------- path: str Path to save the video file. frame_size: tuple Size of the frame as (width, height) codec: str, default = 'FFV1' FourCC video codec for encoding the video. Options include `MP4V', 'X264', 'H264', etc. fps: int or float, default = 30.0 Frame rate for the encoded video color: bool, default = True Whether or not the video is color """ def __init__(self, path, frame_size, codec="FFV1", fps=30.0, color=True): codec = cv2.VideoWriter_fourcc(*codec) self.stream = cv2.VideoWriter(path, codec, fps, frame_size, color) def write(self, frame): self.stream.write(frame) def write_batch(self, batch): for frame in batch: self.write(frame) def close(self): self.stream.release() return not self.stream.isOpened()
Methods
def close(self)
-
Expand source code
def close(self): self.stream.release() return not self.stream.isOpened()
def write(self, frame)
-
Expand source code
def write(self, frame): self.stream.write(frame)
def write_batch(self, batch)
-
Expand source code
def write_batch(self, batch): for frame in batch: self.write(frame)