Skip to content

Video GA text detection and object tracking #2024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 222 additions & 4 deletions video/cloud-client/analyze/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

Usage Examples:

python analyze.py labels gs://cloud-ml-sandbox/video/chicago.mp4
python analyze.py labels gs://cloud-samples-data/video/chicago.mp4
python analyze.py labels_file resources/cat.mp4
python analyze.py shots gs://demomaker/gbikes_dinosaur.mp4
python analyze.py explicit_content gs://demomaker/gbikes_dinosaur.mp4

python analyze.py shots gs://cloud-samples-data/video/gbikes_dinosaur.mp4
python analyze.py explicit_content \
gs://cloud-samples-data/video/gbikes_dinosaur.mp4
python analyze.py text_gcs \
gs://cloud-samples-data/video/googlework_short.mp4
python analyze.py text_file resources/googlework_short.mp4
python analyze.py objects_gcs gs://cloud-samples-data/video/cat.mp4
python analyze.py objects_file resources/cat.mp4
"""

import argparse
Expand Down Expand Up @@ -278,27 +283,232 @@ def speech_transcription(path):
# [END video_speech_transcription_gcs]


def video_detect_text_gcs(input_uri):
# [START video_detect_text_gcs]
"""Detect text in a video stored on GCS."""
from google.cloud import videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()
features = [videointelligence.enums.Feature.TEXT_DETECTION]

operation = video_client.annotate_video(
input_uri=input_uri,
features=features)

print('\nProcessing video for text detection.')
result = operation.result(timeout=300)

# The first result is retrieved because a single video was processed.
annotation_result = result.annotation_results[0]

for text_annotation in annotation_result.text_annotations:
print('\nText: {}'.format(text_annotation.text))

# Get the first text segment
text_segment = text_annotation.segments[0]
start_time = text_segment.segment.start_time_offset
end_time = text_segment.segment.end_time_offset
print('start_time: {}, end_time: {}'.format(
start_time.seconds + start_time.nanos * 1e-9,
end_time.seconds + end_time.nanos * 1e-9))

print('Confidence: {}'.format(text_segment.confidence))

# Show the result for the first frame in this segment.
frame = text_segment.frames[0]
time_offset = frame.time_offset
print('Time offset for the first frame: {}'.format(
time_offset.seconds + time_offset.nanos * 1e-9))
print('Rotated Bounding Box Vertices:')
for vertex in frame.rotated_bounding_box.vertices:
print('\tVertex.x: {}, Vertex.y: {}'.format(vertex.x, vertex.y))
# [END video_detect_text_gcs]


def video_detect_text(path):
# [START video_detect_text]
"""Detect text in a local video."""
from google.cloud import videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()
features = [videointelligence.enums.Feature.TEXT_DETECTION]
video_context = videointelligence.types.VideoContext()

with io.open(path, 'rb') as file:
input_content = file.read()

operation = video_client.annotate_video(
input_content=input_content, # the bytes of the video file
features=features,
video_context=video_context)

print('\nProcessing video for text detection.')
result = operation.result(timeout=300)

# The first result is retrieved because a single video was processed.
annotation_result = result.annotation_results[0]

for text_annotation in annotation_result.text_annotations:
print('\nText: {}'.format(text_annotation.text))

# Get the first text segment
text_segment = text_annotation.segments[0]
start_time = text_segment.segment.start_time_offset
end_time = text_segment.segment.end_time_offset
print('start_time: {}, end_time: {}'.format(
start_time.seconds + start_time.nanos * 1e-9,
end_time.seconds + end_time.nanos * 1e-9))

print('Confidence: {}'.format(text_segment.confidence))

# Show the result for the first frame in this segment.
frame = text_segment.frames[0]
time_offset = frame.time_offset
print('Time offset for the first frame: {}'.format(
time_offset.seconds + time_offset.nanos * 1e-9))
print('Rotated Bounding Box Vertices:')
for vertex in frame.rotated_bounding_box.vertices:
print('\tVertex.x: {}, Vertex.y: {}'.format(vertex.x, vertex.y))
# [END video_detect_text]


def track_objects_gcs(gcs_uri):
# [START video_object_tracking_gcs]
"""Object tracking in a video stored on GCS."""
from google.cloud import videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()
features = [videointelligence.enums.Feature.OBJECT_TRACKING]
operation = video_client.annotate_video(
input_uri=gcs_uri, features=features)
print('\nProcessing video for object annotations.')

result = operation.result(timeout=300)
print('\nFinished processing.\n')

# The first result is retrieved because a single video was processed.
object_annotations = result.annotation_results[0].object_annotations

for object_annotation in object_annotations:
print('Entity description: {}'.format(
object_annotation.entity.description))
if object_annotation.entity.entity_id:
print('Entity id: {}'.format(object_annotation.entity.entity_id))

print('Segment: {}s to {}s'.format(
object_annotation.segment.start_time_offset.seconds +
object_annotation.segment.start_time_offset.nanos / 1e9,
object_annotation.segment.end_time_offset.seconds +
object_annotation.segment.end_time_offset.nanos / 1e9))

print('Confidence: {}'.format(object_annotation.confidence))

# Here we print only the bounding box of the first frame in the segment
frame = object_annotation.frames[0]
box = frame.normalized_bounding_box
print('Time offset of the first frame: {}s'.format(
frame.time_offset.seconds + frame.time_offset.nanos / 1e9))
print('Bounding box position:')
print('\tleft : {}'.format(box.left))
print('\ttop : {}'.format(box.top))
print('\tright : {}'.format(box.right))
print('\tbottom: {}'.format(box.bottom))
print('\n')
# [END video_object_tracking_gcs]


def track_objects(path):
# [START video_object_tracking]
"""Object tracking in a local video."""
from google.cloud import videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()
features = [videointelligence.enums.Feature.OBJECT_TRACKING]

with io.open(path, 'rb') as file:
input_content = file.read()

operation = video_client.annotate_video(
input_content=input_content, features=features)
print('\nProcessing video for object annotations.')

result = operation.result(timeout=300)
print('\nFinished processing.\n')

# The first result is retrieved because a single video was processed.
object_annotations = result.annotation_results[0].object_annotations

# Get only the first annotation for demo purposes.
object_annotation = object_annotations[0]
print('Entity description: {}'.format(
object_annotation.entity.description))
if object_annotation.entity.entity_id:
print('Entity id: {}'.format(object_annotation.entity.entity_id))

print('Segment: {}s to {}s'.format(
object_annotation.segment.start_time_offset.seconds +
object_annotation.segment.start_time_offset.nanos / 1e9,
object_annotation.segment.end_time_offset.seconds +
object_annotation.segment.end_time_offset.nanos / 1e9))

print('Confidence: {}'.format(object_annotation.confidence))

# Here we print only the bounding box of the first frame in this segment
frame = object_annotation.frames[0]
box = frame.normalized_bounding_box
print('Time offset of the first frame: {}s'.format(
frame.time_offset.seconds + frame.time_offset.nanos / 1e9))
print('Bounding box position:')
print('\tleft : {}'.format(box.left))
print('\ttop : {}'.format(box.top))
print('\tright : {}'.format(box.right))
print('\tbottom: {}'.format(box.bottom))
print('\n')
# [END video_object_tracking]


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
subparsers = parser.add_subparsers(dest='command')

analyze_labels_parser = subparsers.add_parser(
'labels', help=analyze_labels.__doc__)
analyze_labels_parser.add_argument('path')

analyze_labels_file_parser = subparsers.add_parser(
'labels_file', help=analyze_labels_file.__doc__)
analyze_labels_file_parser.add_argument('path')

analyze_explicit_content_parser = subparsers.add_parser(
'explicit_content', help=analyze_explicit_content.__doc__)
analyze_explicit_content_parser.add_argument('path')

analyze_shots_parser = subparsers.add_parser(
'shots', help=analyze_shots.__doc__)
analyze_shots_parser.add_argument('path')

transcribe_speech_parser = subparsers.add_parser(
'transcribe', help=speech_transcription.__doc__)
transcribe_speech_parser.add_argument('path')

detect_text_parser = subparsers.add_parser(
'text_gcs', help=video_detect_text_gcs.__doc__)
detect_text_parser.add_argument('path')

detect_text_file_parser = subparsers.add_parser(
'text_file', help=video_detect_text.__doc__)
detect_text_file_parser.add_argument('path')

tack_objects_parser = subparsers.add_parser(
'objects_gcs', help=track_objects_gcs.__doc__)
tack_objects_parser.add_argument('path')

tack_objects_file_parser = subparsers.add_parser(
'objects_file', help=track_objects.__doc__)
tack_objects_file_parser.add_argument('path')

args = parser.parse_args()

if args.command == 'labels':
Expand All @@ -311,3 +521,11 @@ def speech_transcription(path):
analyze_explicit_content(args.path)
if args.command == 'transcribe':
speech_transcription(args.path)
if args.command == 'text_gcs':
video_detect_text_gcs(args.path)
if args.command == 'text_file':
video_detect_text(args.path)
if args.command == 'objects_gcs':
track_objects_gcs(args.path)
if args.command == 'objects_file':
track_objects(args.path)
24 changes: 20 additions & 4 deletions video/cloud-client/analyze/analyze_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,44 @@

@pytest.mark.slow
def test_analyze_shots(capsys):
analyze.analyze_shots('gs://demomaker/gbikes_dinosaur.mp4')
analyze.analyze_shots('gs://cloud-samples-data/video/gbikes_dinosaur.mp4')
out, _ = capsys.readouterr()
assert 'Shot 1:' in out


@pytest.mark.slow
def test_analyze_labels(capsys):
analyze.analyze_labels('gs://demomaker/cat.mp4')
analyze.analyze_labels('gs://cloud-samples-data/video/cat.mp4')
out, _ = capsys.readouterr()
assert 'label description: cat' in out


@pytest.mark.slow
def test_analyze_explicit_content(capsys):
analyze.analyze_explicit_content('gs://demomaker/cat.mp4')
analyze.analyze_explicit_content('gs://cloud-samples-data/video/cat.mp4')
out, _ = capsys.readouterr()
assert 'pornography' in out


@pytest.mark.slow
def test_speech_transcription(capsys):
analyze.speech_transcription(
'gs://python-docs-samples-tests/video/googlework_short.mp4')
'gs://cloud-samples-data/video/googlework_short.mp4')
out, _ = capsys.readouterr()
assert 'cultural' in out


@pytest.mark.slow
def test_detect_text_gcs(capsys):
analyze.video_detect_text_gcs(
'gs://cloud-samples-data/video/googlework_short.mp4')
out, _ = capsys.readouterr()
assert 'GOOGLE' in out


@pytest.mark.slow
def test_track_objects_gcs(capsys):
analyze.track_objects_gcs(
'gs://cloud-samples-data/video/cat.mp4')
out, _ = capsys.readouterr()
assert 'cat' in out
2 changes: 1 addition & 1 deletion video/cloud-client/analyze/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-videointelligence==1.6.1
google-cloud-videointelligence==1.7.0