Module deepposekit.io.video
Expand source code
# -*- coding: utf-8 -*-
# Copyright 2018-2019 Jacob M. Graving <jgraving@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#    http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tensorflow.python.keras.utils import Sequence
import cv2
import numpy as np
import os
__all__ = ["VideoReader", "VideoWriter"]
class VideoReader(cv2.VideoCapture, Sequence):
    """Read a video in batches.
    Parameters
    ----------
    videopath: str
        Path to the video file.
    batch_size: int, default = 1
        Batch size for reading frames
    gray: bool, default = False
        If gray, return only the middle channel
    """
    def __init__(self, videopath, batch_size=1, gray=False):
        if isinstance(videopath, str):
            if os.path.exists(videopath):
                super(VideoReader, self).__init__(videopath)
                self.videopath = videopath
            else:
                raise ValueError("file or path does not exist")
        else:
            raise TypeError("videopath must be str")
        self.batch_size = batch_size
        self.n_frames = int(self.get(cv2.CAP_PROP_FRAME_COUNT))
        self.idx = 0
        self.fps = self.get(cv2.CAP_PROP_FPS)
        self.height = int(self.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.width = int(self.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.finished = False
        self.gray = gray
        self._read = super(VideoReader, self).read
    def read(self):
        """ Read one frame
        Returns
        -------
        frame: array
            Image is returned of the frame if a frame exists.
            Otherwise, return None.
        """
        ret, frame = self._read()
        if ret:
            self.idx += 1
            if self.gray:
                frame = frame[..., 1][..., None]
            return frame
        else:
            self.finished = True
            return None
    def read_batch(self, batch_size=1, asarray=False):
        """ Read in a batch of frames.
        Parameters
        ----------
        batch_size: int, default 1
            Number of frames to pull from the video.
        asarray: bool, default False
            If True, stack the frames (in numpy).
        Returns
        -------
        frames: list or array
            A batch of frames from the video.
        """
        frames = []
        for idx in range(batch_size):
            frame = self.read()
            if not self.finished:
                frames.append(frame)
        empty = len(frames) == 0
        if asarray and not empty:
            frames = np.stack(frames)
        if not empty:
            return frames
        else:
            return
    def close(self):
        """ Close the VideoReader.
        Returns
        -------
        bool
            Returns True if successfully closed.
        """
        self.release()
        return not self.isOpened()
    def __len__(self):
        return int(np.ceil(self.n_frames / float(self.batch_size)))
    def __getitem__(self, index):
        if self.finished:
            raise StopIteration
        if isinstance(index, (int, np.integer)):
            idx0 = index * self.batch_size
            if self.idx != idx0:
                self.set(cv2.CAP_PROP_POS_FRAMES, index - 1)
                self.idx = idx0
            return self.read_batch(self.batch_size, True)
        else:
            raise NotImplementedError
    def __next__(self):
        if self.finished:
            raise StopIteration
        else:
            return self.read_batch(self.batch_size, True)
    def __del__(self):
        self.close()
    @property
    def current_frame(self):
        return int(self.get(cv2.CAP_PROP_POS_FRAMES))
    @property
    def current_time(self):
        return self.get(cv2.CAP_PROP_POS_MSEC)
    @property
    def percent_finished(self):
        return self.get(cv2.CAP_PROP_POS_AVI_RATIO) * 100
class VideoWriter:
    """Read a video in batches.
    Parameters
    ----------
    path: str
        Path to save the video file.
    frame_size: tuple
        Size of the frame as (width, height)
    codec: str, default = 'FFV1'
        FourCC video codec for encoding the video.
        Options include `MP4V',  'X264', 'H264', etc.
    fps: int or float, default = 30.0
        Frame rate for the encoded video
    color: bool, default = True
        Whether or not the video is color
    """
    def __init__(self, path, frame_size, codec="FFV1", fps=30.0, color=True):
        codec = cv2.VideoWriter_fourcc(*codec)
        self.stream = cv2.VideoWriter(path, codec, fps, frame_size, color)
    def write(self, frame):
        self.stream.write(frame)
    def write_batch(self, batch):
        for frame in batch:
            self.write(frame)
    def close(self):
        self.stream.release()
        return not self.stream.isOpened()
Classes
class VideoReader (videopath, batch_size=1, gray=False)- 
Read a video in batches.
Parameters
videopath:str- Path to the video file.
 batch_size:int, default =1- Batch size for reading frames
 gray:bool, default =False- If gray, return only the middle channel
 
Expand source code
class VideoReader(cv2.VideoCapture, Sequence): """Read a video in batches. Parameters ---------- videopath: str Path to the video file. batch_size: int, default = 1 Batch size for reading frames gray: bool, default = False If gray, return only the middle channel """ def __init__(self, videopath, batch_size=1, gray=False): if isinstance(videopath, str): if os.path.exists(videopath): super(VideoReader, self).__init__(videopath) self.videopath = videopath else: raise ValueError("file or path does not exist") else: raise TypeError("videopath must be str") self.batch_size = batch_size self.n_frames = int(self.get(cv2.CAP_PROP_FRAME_COUNT)) self.idx = 0 self.fps = self.get(cv2.CAP_PROP_FPS) self.height = int(self.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.width = int(self.get(cv2.CAP_PROP_FRAME_WIDTH)) self.finished = False self.gray = gray self._read = super(VideoReader, self).read def read(self): """ Read one frame Returns ------- frame: array Image is returned of the frame if a frame exists. Otherwise, return None. """ ret, frame = self._read() if ret: self.idx += 1 if self.gray: frame = frame[..., 1][..., None] return frame else: self.finished = True return None def read_batch(self, batch_size=1, asarray=False): """ Read in a batch of frames. Parameters ---------- batch_size: int, default 1 Number of frames to pull from the video. asarray: bool, default False If True, stack the frames (in numpy). Returns ------- frames: list or array A batch of frames from the video. """ frames = [] for idx in range(batch_size): frame = self.read() if not self.finished: frames.append(frame) empty = len(frames) == 0 if asarray and not empty: frames = np.stack(frames) if not empty: return frames else: return def close(self): """ Close the VideoReader. Returns ------- bool Returns True if successfully closed. """ self.release() return not self.isOpened() def __len__(self): return int(np.ceil(self.n_frames / float(self.batch_size))) def __getitem__(self, index): if self.finished: raise StopIteration if isinstance(index, (int, np.integer)): idx0 = index * self.batch_size if self.idx != idx0: self.set(cv2.CAP_PROP_POS_FRAMES, index - 1) self.idx = idx0 return self.read_batch(self.batch_size, True) else: raise NotImplementedError def __next__(self): if self.finished: raise StopIteration else: return self.read_batch(self.batch_size, True) def __del__(self): self.close() @property def current_frame(self): return int(self.get(cv2.CAP_PROP_POS_FRAMES)) @property def current_time(self): return self.get(cv2.CAP_PROP_POS_MSEC) @property def percent_finished(self): return self.get(cv2.CAP_PROP_POS_AVI_RATIO) * 100Ancestors
- cv2.VideoCapture
 - tensorflow.python.keras.utils.data_utils.Sequence
 
Instance variables
var current_frame- 
Expand source code
@property def current_frame(self): return int(self.get(cv2.CAP_PROP_POS_FRAMES)) var current_time- 
Expand source code
@property def current_time(self): return self.get(cv2.CAP_PROP_POS_MSEC) var percent_finished- 
Expand source code
@property def percent_finished(self): return self.get(cv2.CAP_PROP_POS_AVI_RATIO) * 100 
Methods
def close(self)- 
Close the VideoReader.
Returns
bool- Returns True if successfully closed.
 
Expand source code
def close(self): """ Close the VideoReader. Returns ------- bool Returns True if successfully closed. """ self.release() return not self.isOpened() def read(self)- 
Read one frame
Returns
frame:array- Image is returned of the frame if a frame exists. Otherwise, return None.
 
Expand source code
def read(self): """ Read one frame Returns ------- frame: array Image is returned of the frame if a frame exists. Otherwise, return None. """ ret, frame = self._read() if ret: self.idx += 1 if self.gray: frame = frame[..., 1][..., None] return frame else: self.finished = True return None def read_batch(self, batch_size=1, asarray=False)- 
Read in a batch of frames.
Parameters
batch_size:int, default1- Number of frames to pull from the video.
 asarray:bool, defaultFalse- If True, stack the frames (in numpy).
 
Returns
frames:listorarray- A batch of frames from the video.
 
Expand source code
def read_batch(self, batch_size=1, asarray=False): """ Read in a batch of frames. Parameters ---------- batch_size: int, default 1 Number of frames to pull from the video. asarray: bool, default False If True, stack the frames (in numpy). Returns ------- frames: list or array A batch of frames from the video. """ frames = [] for idx in range(batch_size): frame = self.read() if not self.finished: frames.append(frame) empty = len(frames) == 0 if asarray and not empty: frames = np.stack(frames) if not empty: return frames else: return 
 class VideoWriter (path, frame_size, codec='FFV1', fps=30.0, color=True)- 
Read a video in batches.
Parameters
path:str- Path to save the video file.
 frame_size:tuple- Size of the frame as (width, height)
 codec:str, default ='FFV1'- FourCC video codec for encoding the video. Options include `MP4V', 'X264', 'H264', etc.
 fps:intorfloat, default =30.0- Frame rate for the encoded video
 color:bool, default =True- Whether or not the video is color
 
Expand source code
class VideoWriter: """Read a video in batches. Parameters ---------- path: str Path to save the video file. frame_size: tuple Size of the frame as (width, height) codec: str, default = 'FFV1' FourCC video codec for encoding the video. Options include `MP4V', 'X264', 'H264', etc. fps: int or float, default = 30.0 Frame rate for the encoded video color: bool, default = True Whether or not the video is color """ def __init__(self, path, frame_size, codec="FFV1", fps=30.0, color=True): codec = cv2.VideoWriter_fourcc(*codec) self.stream = cv2.VideoWriter(path, codec, fps, frame_size, color) def write(self, frame): self.stream.write(frame) def write_batch(self, batch): for frame in batch: self.write(frame) def close(self): self.stream.release() return not self.stream.isOpened()Methods
def close(self)- 
Expand source code
def close(self): self.stream.release() return not self.stream.isOpened() def write(self, frame)- 
Expand source code
def write(self, frame): self.stream.write(frame) def write_batch(self, batch)- 
Expand source code
def write_batch(self, batch): for frame in batch: self.write(frame)