Video Super Resolution with OpenVINO¶
Super Resolution is the process of enhancing the quality of an image by increasing the pixel count using deep learning. This notebook applies Single Image Super Resolution (SISR) to frames in a 360p (480×360) video in 360p resolution. We use a model called single-image-super-resolution-1032 which is available from the Open Model Zoo. It is based on the research paper cited below.
Y. Liu et al., “An Attention-Based Approach for Single Image Super Resolution,” 2018 24th International Conference on Pattern Recognition (ICPR), 2018, pp. 2777-2784, doi: 10.1109/ICPR.2018.8545760.
NOTE: The Single Image Super Resolution (SISR) model used in this demo is not optimized for video. Results may vary depending on the video.
Preparation¶
Preparation¶
Imports¶
import time
import urllib
from pathlib import Path
import cv2
import numpy as np
from IPython.display import (
HTML,
FileLink,
Pretty,
ProgressBar,
Video,
clear_output,
display,
)
from openvino.runtime import Core
from pytube import YouTube
Settings¶
# Device to use for inference. For example, "CPU", or "GPU"
DEVICE = "CPU"
# 1032: 4x superresolution, 1033: 3x superresolution
MODEL_FILE = "model/single-image-super-resolution-1032.xml"
model_name = Path(MODEL_FILE).name
model_xml_path = Path(MODEL_FILE).with_suffix(".xml")
Functions¶
def write_text_on_image(image: np.ndarray, text: str) -> np.ndarray:
"""
Write the specified text in the top left corner of the image
as white text with a black border.
:param image: image as numpy array with HWC shape, RGB or BGR
:param text: text to write
:return: image with written text, as numpy array
"""
font = cv2.FONT_HERSHEY_PLAIN
org = (20, 20)
font_scale = 4
font_color = (255, 255, 255)
line_type = 1
font_thickness = 2
text_color_bg = (0, 0, 0)
x, y = org
image = cv2.UMat(image)
(text_w, text_h), _ = cv2.getTextSize(
text=text, fontFace=font, fontScale=font_scale, thickness=font_thickness
)
result_im = cv2.rectangle(
img=image, pt1=org, pt2=(x + text_w, y + text_h), color=text_color_bg, thickness=-1
)
textim = cv2.putText(
img=result_im,
text=text,
org=(x, y + text_h + font_scale - 1),
fontFace=font,
fontScale=font_scale,
color=font_color,
thickness=font_thickness,
lineType=line_type,
)
return textim.get()
def load_image(path: str) -> np.ndarray:
"""
Loads an image from `path` and returns it as BGR numpy array.
:param path: path to an image filename or url
:return: image as numpy array, with BGR channel order
"""
if path.startswith("http"):
# Set User-Agent to Mozilla because some websites block requests
# with User-Agent Python
request = urllib.request.Request(url=path, headers={"User-Agent": "Mozilla/5.0"})
response = urllib.request.urlopen(url=request)
array = np.asarray(bytearray(response.read()), dtype="uint8")
image = cv2.imdecode(buf=array, flags=-1) # Loads the image as BGR
else:
image = cv2.imread(filename=path)
return image
def convert_result_to_image(result) -> np.ndarray:
"""
Convert network result of floating point numbers to image with integer
values from 0-255. Values outside this range are clipped to 0 and 255.
:param result: a single superresolution network result in N,C,H,W shape
"""
result = result.squeeze(0).transpose(1, 2, 0)
result *= 255
result[result < 0] = 0
result[result > 255] = 255
result = result.astype(np.uint8)
return result
Load the Superresolution Model¶
Load the model in Inference Engine with ie.read_model
and compile it
for the specified device with ie.compile_model
.
ie = Core()
model = ie.read_model(model=model_xml_path)
compiled_model = ie.compile_model(model=model, device_name=DEVICE)
Get information about network inputs and outputs. The Super Resolution model expects two inputs: 1) the input image, 2) a bicubic interpolation of the input image to the target size 1920x1080. It returns the super resolution version of the image in 1920x1080.
# Network inputs and outputs are dictionaries. Get the keys for the
# dictionaries.
original_image_key, bicubic_image_key = compiled_model.inputs
output_key = compiled_model.output(0)
# Get the expected input and target shape. `.dims[2:]` returns the height
# and width. OpenCV's resize function expects the shape as (width, height),
# so we reverse the shape with `[::-1]` and convert it to a tuple
input_height, input_width = list(original_image_key.shape)[2:]
target_height, target_width = list(bicubic_image_key.shape)[2:]
upsample_factor = int(target_height / input_height)
print(f"The network expects inputs with a width of {input_width}, " f"height of {input_height}")
print(f"The network returns images with a width of {target_width}, " f"height of {target_height}")
print(
f"The image sides are upsampled by a factor {upsample_factor}. "
f"The new image is {upsample_factor**2} times as large as the "
"original image"
)
The network expects inputs with a width of 480, height of 270
The network returns images with a width of 1920, height of 1080
The image sides are upsampled by a factor 4. The new image is 16 times as large as the original image
Superresolution on Video¶
Download a YouTube* video with PyTube and enhance the video quality with superresolution.
By default only the first 100 frames of the video are processed. Change NUM_FRAMES in the cell below to modify this.
Note: - The resulting video does not contain audio. - The input video should be a landscape video and have an input resolution of 360p (640x360) for the 1032 model, or 480p (720x480) for the 1033 model.
Settings¶
VIDEO_DIR = "data"
OUTPUT_DIR = "output"
Path(OUTPUT_DIR).mkdir(exist_ok=True)
# Maximum number of frames to read from the input video. Set to 0 to read all frames.
NUM_FRAMES = 100
# The format for saving the result videos. vp09 is slow, but widely available.
# If you have FFMPEG installed, you can change FOURCC to `*"THEO"` to improve video writing speed.
FOURCC = cv2.VideoWriter_fourcc(*"vp09")
Download and Prepare Video¶
# Use pytube to download a video. It downloads to the videos subdirectory.
# You can also place a local video there and comment out the following lines
VIDEO_URL = "https://www.youtube.com/watch?v=V8yS3WIkOrA"
yt = YouTube(VIDEO_URL)
# Use `yt.streams` to see all available streams. See the PyTube documentation
# https://python-pytube.readthedocs.io/en/latest/api.html for advanced
# filtering options
try:
Path(VIDEO_DIR).mkdir(exist_ok=True)
stream = yt.streams.filter(resolution="360p").first()
filename = Path(stream.default_filename.encode("ascii", "ignore").decode("ascii")).stem
stream.download(output_path=OUTPUT_DIR, filename=filename)
print(f"Video {filename} downloaded to {OUTPUT_DIR}")
# Create Path objects for the input video and the resulting videos
video_path = Path(stream.get_file_path(filename, OUTPUT_DIR))
except Exception:
# If PyTube fails, use a local video stored in the VIDEO_DIR directory
video_path = Path(rf"{VIDEO_DIR}/CEO Pat Gelsinger on Leading Intel.mp4")
# Path names for the result videos
superres_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres.mp4")
bicubic_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_bicubic.mp4")
comparison_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres_comparison.mp4")
Video Leading Intel with CEO Pat Gelsinger downloaded to output
# Open the video and get the dimensions and the FPS
cap = cv2.VideoCapture(filename=str(video_path))
ret, image = cap.read()
if not ret:
raise ValueError(f"The video at '{video_path}' cannot be read.")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
if NUM_FRAMES == 0:
total_frames = frame_count
else:
total_frames = min(frame_count, NUM_FRAMES)
original_frame_height, original_frame_width = image.shape[:2]
cap.release()
print(
f"The input video has a frame width of {original_frame_width}, "
f"frame height of {original_frame_height} and runs at {fps:.2f} fps"
)
The input video has a frame width of 640, frame height of 360 and runs at 29.97 fps
Create superresolution video, bicubic video and comparison video. The superresolution video contains the enhanced video, upsampled with superresolution, the bicubic video is the input video upsampled with bicubic interpolation, the combination video sets the bicubic video and the superresolution side by side.
superres_video = cv2.VideoWriter(
filename=str(superres_video_path),
fourcc=FOURCC,
fps=fps,
frameSize=(target_width, target_height),
)
bicubic_video = cv2.VideoWriter(
filename=str(bicubic_video_path),
fourcc=FOURCC,
fps=fps,
frameSize=(target_width, target_height),
)
comparison_video = cv2.VideoWriter(
filename=str(comparison_video_path),
fourcc=FOURCC,
fps=fps,
frameSize=(target_width * 2, target_height),
)
Do Inference¶
Read video frames and enhance them with superresolution. Save the superresolution video, the bicubic video and the comparison video to file.
The code in this cell reads the video frame by frame. Each frame is
resized and reshaped to network input shape and upsampled with bicubic
interpolation to target shape. Both the original and the bicubic image
are propagated through the network. The network result is a numpy array
with floating point values, with a shape of (1,3,1920,1080)
. This
array is converted to an 8-bit image with shape (1080,1920,3)
and
written to superres_video
. The bicubic image is written to
bicubic_video
for comparison. Lastly, the bicubic and result frames
are combined side by side and written to comparison_video
. A
progress bar shows the progress of the process. Inference time is
measured, as well as total time to process each frame, which includes
inference time as well as the time it takes to process and write the
video.
start_time = time.perf_counter()
frame_nr = 0
total_inference_duration = 0
progress_bar = ProgressBar(total=total_frames)
progress_bar.display()
cap = cv2.VideoCapture(filename=str(video_path))
try:
while cap.isOpened():
ret, image = cap.read()
if not ret:
cap.release()
break
if frame_nr >= total_frames:
break
# Resize the input image to network shape and convert from (H,W,C) to
# (N,C,H,W)
resized_image = cv2.resize(src=image, dsize=(input_width, input_height))
input_image_original = np.expand_dims(resized_image.transpose(2, 0, 1), axis=0)
# Resize and reshape the image to the target shape with bicubic
# interpolation
bicubic_image = cv2.resize(
src=image, dsize=(target_width, target_height), interpolation=cv2.INTER_CUBIC
)
input_image_bicubic = np.expand_dims(bicubic_image.transpose(2, 0, 1), axis=0)
# Do inference
inference_start_time = time.perf_counter()
result = compiled_model(
{
original_image_key.any_name: input_image_original,
bicubic_image_key.any_name: input_image_bicubic,
}
)[output_key]
inference_stop_time = time.perf_counter()
inference_duration = inference_stop_time - inference_start_time
total_inference_duration += inference_duration
# Transform inference result into an image
result_frame = convert_result_to_image(result=result)
# Write resulting image and bicubic image to video
superres_video.write(image=result_frame)
bicubic_video.write(image=bicubic_image)
stacked_frame = np.hstack((bicubic_image, result_frame))
comparison_video.write(image=stacked_frame)
frame_nr = frame_nr + 1
# Update progress bar and status message
progress_bar.progress = frame_nr
progress_bar.update()
if frame_nr % 10 == 0 or frame_nr == total_frames:
clear_output(wait=True)
progress_bar.display()
display(
Pretty(
f"Processed frame {frame_nr}. Inference time: "
f"{inference_duration:.2f} seconds "
f"({1/inference_duration:.2f} FPS)"
)
)
except KeyboardInterrupt:
print("Processing interrupted.")
finally:
superres_video.release()
bicubic_video.release()
comparison_video.release()
end_time = time.perf_counter()
duration = end_time - start_time
print(f"Video's saved to {comparison_video_path.parent} directory.")
print(
f"Processed {frame_nr} frames in {duration:.2f} seconds. Total FPS "
f"(including video processing): {frame_nr/duration:.2f}. "
f"Inference FPS: {frame_nr/total_inference_duration:.2f}."
)
Processed frame 100. Inference time: 0.36 seconds (2.78 FPS)
Video's saved to output directory.
Processed 100 frames in 349.10 seconds. Total FPS (including video processing): 0.29. Inference FPS: 2.74.
Show Side-by-Side Video of Bicubic and Superresolution Version¶
if not comparison_video_path.exists():
raise ValueError("The comparison video does not exist.")
else:
video_link = FileLink(comparison_video_path)
video_link.html_link_str = "<a href='%s' download>%s</a>"
display(
HTML(
f"Showing side by side comparison. If you cannot see the video in "
"your browser, please click on the following link to download "
f"the video<br>{video_link._repr_html_()}"
)
)
display(Video(comparison_video_path, width=800, embed=True))
output/Leading Intel with CEO Pat Gelsinger_superres_comparison.mp4