Motive I tried to extract characters from subtitles (OpenCV: tesseract-ocr edition) Google CloudVision API I would like to use vision /) to extract the subtitle characters.
Method First of all, in order to use the Google Cloud Vision API, you need to register an account on the google cloud console and obtain an API key. For the method, refer to Cloud Vision API Usage Summary (with sample code).
import requests
import json
import base64
import cv2
import sys
if __name__ == "__main__":
KEY = "--- your api key ---"
url = 'https://vision.googleapis.com/v1/images:annotate?key='
api_url = url + KEY
#Image loading
img_file_path = sys.argv[1]
mat = cv2.imread(img_file_path)
#Only the subtitle display part
roi = mat[435:600, :]
# openCV -> base64
result, dst_data = cv2.imencode('.png', roi)
img_content = base64.b64encode(dst_data)
#Create request body
req_body = json.dumps({
'requests': [{
'image': {
'content': img_content.decode('utf-8')
},
'features': [{
'type': 'DOCUMENT_TEXT_DETECTION'
}]
}]
})
#Request issuance
res = requests.post(api_url, data=req_body)
#Image information acquisition from request
res_json = res.json()['responses']
if 0 < len(res_json[0]):
textobj = res_json[0]['textAnnotations'][0]
print("".join(textobj["description"].strip().split("\n")))
When requesting api, I design json by converting the image to base64 as a character string.
src = cv2.imread("image_path")
result, dst_data = cv2.imencode('.png', src)
img_content = base64.b64encode(dst_data)
You can convert from openCV with.
In addition, it is sufficient to extract characters while calling the API with sequential processing as it is, but if parallel processing is performed using ʻasyncio, the processing speed will increase. The reason for using ʻasyncio
is that it is more stable than multiprocessing.Pool
because it blocks API response processing within the coroutine.
async def main_image_process(src):
#Processed to make character recognition easier
gray_frame = pre_process(src.content)
#Trim only where telop is likely to appear
roi = gray_frame[435:600, :]
#Extract text
text = await extractTelopText(roi)
await asyncio.sleep(2)
dst = await createFooterTelop(src.content)
dst = await addJapaneseTelop(dst, text, 20, cap_height + telop_height - 30)
dst = await addASCIITelop(dst, str(src.timestamp) + "[sec]", cap_width - 250, cap_height + telop_height - 10, color=(0,255,0))
return MovieFrame(src.id, dst, src.timestamp)
if __name__ == "__main__":
r = []
loop = asyncio.get_event_loop()
try:
r = loop.run_until_complete(
asyncio.gather(*[main_image_process(f) for f in frames])
)
finally:
loop.close()
It seems that it can be written more simply with Python 3.7 or later, but it is not stable yet and the default package of centOS was Python 3.6, so I am writing it based on 3.6.
Development
The whole code.
import sys
import cv2
import io
import os
import numpy as np
import base64
import json
import requests
import asyncio
from PIL import Image, ImageDraw, ImageFont
from collections import namedtuple
import time
MovieFrame = namedtuple("MovieFrame", ["id", "content", "timestamp"])
telop_height = 50
cap_width = 1
cap_height = 1
def pre_process(src):
kernel = np.ones((3,3),np.uint8)
gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
o_ret, o_dst = cv2.threshold(gray, 0, 255, cv2.THRESH_OTSU)
dst = cv2.morphologyEx(o_dst, cv2.MORPH_OPEN, kernel)
dst = cv2.bitwise_not(dst)
dst = cv2.cvtColor(dst, cv2.COLOR_GRAY2BGR)
return dst
async def extractTelopText(src):
KEY = "--- your api key ---"
url = 'https://vision.googleapis.com/v1/images:annotate?key='
api_url = url + KEY
message = ""
result, dst_data = cv2.imencode('.png', src)
img_content = base64.b64encode(dst_data)
#Create request body
req_body = json.dumps({
'requests': [{
'image': {
'content': img_content.decode('utf-8')
},
'features': [{
'type': 'DOCUMENT_TEXT_DETECTION'
}]
}]
})
#Request issuance
res = requests.post(api_url, data=req_body)
#Image information acquisition from request
res_json = res.json()['responses']
if 0 < len(res_json[0]):
textobj = res_json[0]["textAnnotations"][0]
message = "".join(textobj["description"].strip().split("\n"))
return message
async def createFooterTelop(src):
telop = np.zeros((telop_height, cap_width, 3), np.uint8)
telop[:] = tuple((128,128,128))
images = [src, telop]
dst = np.concatenate(images, axis=0)
return dst
async def main_image_process(src):
#Processed to make character recognition easier
gray_frame = pre_process(src.content)
#Trim only where telop is likely to appear
roi = gray_frame[435:600, :]
#Extract text
text = await extractTelopText(roi)
await asyncio.sleep(2)
dst = await createFooterTelop(src.content)
dst = await addJapaneseTelop(dst, text, 20, cap_height + telop_height - 30)
dst = await addASCIITelop(dst, str(src.timestamp) + "[sec]", cap_width - 250, cap_height + telop_height - 10, color=(0,255,0))
return MovieFrame(src.id, dst, src.timestamp)
async def addASCIITelop(src, sentence, px, py, color=(8,8,8), fsize=28):
cv2.putText(src, sentence,
(px, py),
cv2.FONT_HERSHEY_SIMPLEX,
1,
color,
2,
cv2.LINE_AA)
return src
async def addJapaneseTelop(src, sentence, px, py, color=(8,8,8), fsize=28):
rgbImg = cv2.cvtColor(src, cv2.COLOR_BGR2RGB)
canvas = Image.fromarray(rgbImg).copy()
draw = ImageDraw.Draw(canvas)
font = ImageFont.truetype("./IPAfont00303/ipag.ttf", fsize)
draw.text((px, py), sentence, fill=color, font=font)
dst = cv2.cvtColor(np.array(canvas, dtype=np.uint8), cv2.COLOR_RGB2BGR)
return dst
if __name__ == '__main__':
cap = cv2.VideoCapture('one_minutes.mp4')
cap_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
cap_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
telop_height = 50
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
writer = cv2.VideoWriter('extract_telop_async.mp4',fourcc, fps, (cap_width, cap_height + telop_height))
frames = []
start = time.time()
idx = 0
#read frame
try :
while True:
if not cap.isOpened():
break
if cv2.waitKey(1) & 0xFF == ord('q'):
break
ret, frame = cap.read()
if frame is None:
break
frames.append(MovieFrame(idx,frame, round(idx/fps, 4)) )
idx += 1
except cv2.error as e:
print(e)
cap.release()
print("read movie file")
#process
r = []
loop = asyncio.get_event_loop()
try:
r = loop.run_until_complete(
asyncio.gather(*[main_image_process(f) for f in frames])
)
finally:
loop.close()
#sort
sorted_out = sorted(r, key=lambda x: x.id)
#write frame
try :
for item in sorted_out:
writer.write(item.content)
except cv2.error as e:
print(e)
writer.release()
print("write movie file")
print("Done!!! {}[sec]".format(round(time.time() - start,4)))
Result
tesseract-ocr Processing time: 450sec ≒ 7.5 minutes
GoogleCloudVisionAPI
Processing time: 1315sec ≒ 21 minutes
It took a lot of processing time because asynchronous processing is applied while using an external API, but you can see that Google CloudVision API is better for OCR accuracy.
Future Next time, I'm thinking of editing the video of I deleted the object using image repair (inpaint) (OpenCV: C ++). I'm thinking of making it C ++ based on the code I used. When it comes to --How to use curl --thread (Is there a library equivalent to ʻasyncio` in the first place?) I have to think about how to handle it. Is there no choice but to use boost? : tired_face:
Reference -Summary of how to use Cloud Vision API (with sample code) -[Python3] Encode image to Base64, read Base64 into NumPy array, process with OpenCV, convert NumPy array to Base64
Recommended Posts