Skip to content

Commit 776cef0

Browse files
jerjoudpebot
authored andcommitted
Minute workaround (#834)
* Working draft of continuous listening. * Clean up continuous transcription. * Indefinite transcoding of utterances * Set WRAP_IT_UP to something more obvious. * Reduce amount of audio lost between utterances * Add audio overlap. * Remove utterance sample, so there's only one. * Add tests for minute-workaround. * PR feedback.
1 parent 644fdcb commit 776cef0

File tree

3 files changed

+393
-0
lines changed

3 files changed

+393
-0
lines changed
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
#!/usr/bin/python
2+
3+
# Copyright (C) 2016 Google Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Sample that streams audio to the Google Cloud Speech API via GRPC.
18+
19+
This sample expands on transcribe_streaming.py to work around the 1-minute
20+
limit on streaming requests. It does this by transcribing normally until
21+
WRAP_IT_UP_SECS seconds before the 1-minute limit. At that point, it waits for
22+
the end of an utterance and once it hears it, it closes the current stream and
23+
opens a new one. It also keeps a buffer of audio around while this is
24+
happening, that it sends to the new stream in its initial request, to minimize
25+
losing any speech that occurs while this happens.
26+
27+
Note that you could do this a little more simply by simply re-starting the
28+
stream after every utterance, though this increases the possibility of audio
29+
being missed between streams. For learning purposes (and robustness), the more
30+
complex implementation is shown here.
31+
"""
32+
33+
from __future__ import division
34+
35+
import argparse
36+
import collections
37+
import contextlib
38+
import functools
39+
import logging
40+
import re
41+
import signal
42+
import sys
43+
import time
44+
45+
import google.auth
46+
import google.auth.transport.grpc
47+
import google.auth.transport.requests
48+
from google.cloud.proto.speech.v1beta1 import cloud_speech_pb2
49+
from google.rpc import code_pb2
50+
import grpc
51+
import pyaudio
52+
from six.moves import queue
53+
54+
# Seconds you have to wrap up your utterance
55+
WRAP_IT_UP_SECS = 15
56+
SECS_OVERLAP = 1
57+
58+
# Audio recording parameters
59+
RATE = 16000
60+
CHUNK = int(RATE / 10) # 100ms
61+
62+
# The Speech API has a streaming limit of 60 seconds of audio*, so keep the
63+
# connection alive for that long, plus some more to give the API time to figure
64+
# out the transcription.
65+
# * https://g.co/cloud/speech/limits#content
66+
DEADLINE_SECS = 60 * 3 + 5
67+
SPEECH_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'
68+
69+
70+
def make_channel(host):
71+
"""Creates a secure channel with auth credentials from the environment."""
72+
# Grab application default credentials from the environment
73+
credentials, _ = google.auth.default(scopes=[SPEECH_SCOPE])
74+
75+
# Create a secure channel using the credentials.
76+
http_request = google.auth.transport.requests.Request()
77+
78+
return google.auth.transport.grpc.secure_authorized_channel(
79+
credentials, http_request, host)
80+
81+
82+
def _audio_data_generator(buff, overlap_buffer):
83+
"""A generator that yields all available data in the given buffer.
84+
85+
Args:
86+
buff (Queue): A Queue where each element is a chunk of data.
87+
overlap_buffer (deque): a ring buffer for storing trailing data chunks
88+
Yields:
89+
bytes: A chunk of data that is the aggregate of all chunks of data in
90+
`buff`. The function will block until at least one data chunk is
91+
available.
92+
"""
93+
if overlap_buffer:
94+
yield b''.join(overlap_buffer)
95+
overlap_buffer.clear()
96+
97+
while True:
98+
# Use a blocking get() to ensure there's at least one chunk of data.
99+
data = [buff.get()]
100+
101+
# Now consume whatever other data's still buffered.
102+
while True:
103+
try:
104+
data.append(buff.get(block=False))
105+
except queue.Empty:
106+
break
107+
108+
# `None` in the buffer signals that we should stop generating. Put the
109+
# data back into the buffer for the next generator.
110+
if None in data:
111+
data.remove(None)
112+
if data:
113+
buff.put(b''.join(data))
114+
break
115+
else:
116+
overlap_buffer.extend(data)
117+
118+
yield b''.join(data)
119+
120+
121+
def _fill_buffer(buff, in_data, frame_count, time_info, status_flags):
122+
"""Continuously collect data from the audio stream, into the buffer."""
123+
buff.put(in_data)
124+
return None, pyaudio.paContinue
125+
126+
127+
# [START audio_stream]
128+
@contextlib.contextmanager
129+
def record_audio(rate, chunk):
130+
"""Opens a recording stream in a context manager."""
131+
# Create a thread-safe buffer of audio data
132+
buff = queue.Queue()
133+
134+
audio_interface = pyaudio.PyAudio()
135+
audio_stream = audio_interface.open(
136+
format=pyaudio.paInt16,
137+
# The API currently only supports 1-channel (mono) audio
138+
# https://goo.gl/z757pE
139+
channels=1, rate=rate,
140+
input=True, frames_per_buffer=chunk,
141+
# Run the audio stream asynchronously to fill the buffer object.
142+
# This is necessary so that the input device's buffer doesn't overflow
143+
# while the calling thread makes network requests, etc.
144+
stream_callback=functools.partial(_fill_buffer, buff),
145+
)
146+
147+
yield buff
148+
149+
audio_stream.stop_stream()
150+
audio_stream.close()
151+
# Signal the _audio_data_generator to finish
152+
buff.put(None)
153+
audio_interface.terminate()
154+
# [END audio_stream]
155+
156+
157+
def request_stream(data_stream, rate, interim_results=True):
158+
"""Yields `StreamingRecognizeRequest`s constructed from a recording audio
159+
stream.
160+
161+
Args:
162+
data_stream (generator): The raw audio data to send.
163+
rate (int): The sampling rate in hertz.
164+
interim_results (boolean): Whether to return intermediate results,
165+
before the transcription is finalized.
166+
"""
167+
# The initial request must contain metadata about the stream, so the
168+
# server knows how to interpret it.
169+
recognition_config = cloud_speech_pb2.RecognitionConfig(
170+
# There are a bunch of config options you can specify. See
171+
# https://goo.gl/KPZn97 for the full list.
172+
encoding='LINEAR16', # raw 16-bit signed LE samples
173+
sample_rate=rate, # the rate in hertz
174+
# See http://g.co/cloud/speech/docs/languages
175+
# for a list of supported languages.
176+
language_code='en-US', # a BCP-47 language tag
177+
)
178+
streaming_config = cloud_speech_pb2.StreamingRecognitionConfig(
179+
interim_results=interim_results,
180+
config=recognition_config,
181+
)
182+
183+
yield cloud_speech_pb2.StreamingRecognizeRequest(
184+
streaming_config=streaming_config)
185+
186+
for data in data_stream:
187+
# Subsequent requests can all just have the content
188+
yield cloud_speech_pb2.StreamingRecognizeRequest(audio_content=data)
189+
190+
191+
def listen_print_loop(
192+
recognize_stream, wrap_it_up_secs, buff, max_recog_secs=60):
193+
"""Iterates through server responses and prints them.
194+
195+
The recognize_stream passed is a generator that will block until a response
196+
is provided by the server. When the transcription response comes, print it.
197+
198+
In this case, responses are provided for interim results as well. If the
199+
response is an interim one, print a line feed at the end of it, to allow
200+
the next result to overwrite it, until the response is a final one. For the
201+
final one, print a newline to preserve the finalized transcription.
202+
"""
203+
# What time should we switch to a new stream?
204+
time_to_switch = time.time() + max_recog_secs - wrap_it_up_secs
205+
graceful_exit = False
206+
num_chars_printed = 0
207+
for resp in recognize_stream:
208+
if resp.error.code != code_pb2.OK:
209+
raise RuntimeError('Server error: ' + resp.error.message)
210+
211+
if not resp.results:
212+
if resp.endpointer_type is resp.END_OF_SPEECH and (
213+
time.time() > time_to_switch):
214+
graceful_exit = True
215+
buff.put(None)
216+
continue
217+
218+
# Display the top transcription
219+
result = resp.results[0]
220+
transcript = result.alternatives[0].transcript
221+
222+
# If the previous result was longer than this one, we need to print
223+
# some extra spaces to overwrite the previous result
224+
overwrite_chars = ' ' * max(0, num_chars_printed - len(transcript))
225+
226+
# Display interim results, but with a carriage return at the end of the
227+
# line, so subsequent lines will overwrite them.
228+
if not result.is_final:
229+
sys.stdout.write(transcript + overwrite_chars + '\r')
230+
sys.stdout.flush()
231+
232+
num_chars_printed = len(transcript)
233+
234+
else:
235+
print(transcript + overwrite_chars)
236+
237+
# Exit recognition if any of the transcribed phrases could be
238+
# one of our keywords.
239+
if re.search(r'\b(exit|quit)\b', transcript, re.I):
240+
print('Exiting..')
241+
recognize_stream.cancel()
242+
243+
elif graceful_exit:
244+
break
245+
246+
num_chars_printed = 0
247+
248+
249+
def main():
250+
service = cloud_speech_pb2.SpeechStub(
251+
make_channel('speech.googleapis.com'))
252+
253+
# For streaming audio from the microphone, there are three threads.
254+
# First, a thread that collects audio data as it comes in
255+
with record_audio(RATE, CHUNK) as buff:
256+
# Second, a thread that sends requests with that data
257+
overlap_buffer = collections.deque(
258+
maxlen=int(SECS_OVERLAP * RATE / CHUNK))
259+
requests = request_stream(
260+
_audio_data_generator(buff, overlap_buffer), RATE)
261+
# Third, a thread that listens for transcription responses
262+
recognize_stream = service.StreamingRecognize(
263+
requests, DEADLINE_SECS)
264+
265+
# Exit things cleanly on interrupt
266+
signal.signal(signal.SIGINT, lambda *_: recognize_stream.cancel())
267+
268+
# Now, put the transcription responses to use.
269+
try:
270+
while True:
271+
listen_print_loop(recognize_stream, WRAP_IT_UP_SECS, buff)
272+
273+
# Discard this stream and create a new one.
274+
# Note: calling .cancel() doesn't immediately raise an RpcError
275+
# - it only raises when the iterator's next() is requested
276+
recognize_stream.cancel()
277+
278+
logging.debug('Starting new stream')
279+
requests = request_stream(_audio_data_generator(
280+
buff, overlap_buffer), RATE)
281+
recognize_stream = service.StreamingRecognize(
282+
requests, DEADLINE_SECS)
283+
284+
except grpc.RpcError:
285+
# This happens because of the interrupt handler
286+
pass
287+
288+
289+
if __name__ == '__main__':
290+
parser = argparse.ArgumentParser()
291+
parser.add_argument(
292+
'-v', '--verbose', help='increase output verbosity',
293+
action='store_true')
294+
args = parser.parse_args()
295+
if args.verbose:
296+
logging.basicConfig(level=logging.DEBUG)
297+
298+
main()
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright 2016, Google, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import re
17+
import threading
18+
import time
19+
20+
import transcribe_streaming_minute as transcribe_streaming
21+
22+
23+
class MockPyAudio(object):
24+
def __init__(self, *audio_filenames):
25+
self.audio_filenames = audio_filenames
26+
27+
def __call__(self, *args):
28+
return self
29+
30+
def open(self, stream_callback, *args, **kwargs):
31+
self.closed = threading.Event()
32+
self.stream_thread = threading.Thread(
33+
target=self.stream_audio, args=(
34+
self.audio_filenames, stream_callback, self.closed))
35+
self.stream_thread.start()
36+
return self
37+
38+
def close(self):
39+
self.closed.set()
40+
41+
def stop_stream(self):
42+
pass
43+
44+
def terminate(self):
45+
pass
46+
47+
@staticmethod
48+
def stream_audio(audio_filenames, callback, closed, num_frames=512):
49+
# audio is 16-bit samples, whereas python byte is 8-bit
50+
num_bytes = 2 * num_frames
51+
# Approximate realtime by sleeping for the appropriate time for the
52+
# requested number of frames
53+
sleep_secs = num_frames / float(transcribe_streaming.RATE)
54+
55+
for audio_filename in audio_filenames:
56+
with open(audio_filename, 'rb') as audio_file:
57+
# While the audio stream hasn't been closed, give it chunks of
58+
# the audio file, until we run out of audio file.
59+
while not closed.is_set():
60+
chunk = audio_file.read(num_bytes)
61+
if not chunk:
62+
break
63+
time.sleep(sleep_secs)
64+
callback(chunk, None, None, None)
65+
else:
66+
break
67+
68+
# Ran out of audio data. Give a second of silence between files
69+
for _ in range(int(1 + 1 / sleep_secs)):
70+
if closed.is_set():
71+
break
72+
time.sleep(sleep_secs)
73+
callback(b'\0' * num_bytes, None, None, None)
74+
else:
75+
# No more audio left. Just give silence until we're done
76+
while not closed.is_set():
77+
time.sleep(sleep_secs)
78+
callback(b'\0' * num_bytes, None, None, None)
79+
80+
81+
def test_main(resource, monkeypatch, capsys, caplog):
82+
caplog.setLevel(logging.DEBUG)
83+
monkeypatch.setattr(
84+
transcribe_streaming.pyaudio, 'PyAudio',
85+
MockPyAudio(resource('audio.raw'), resource('quit.raw')))
86+
monkeypatch.setattr(
87+
transcribe_streaming, 'WRAP_IT_UP_SECS', 59)
88+
89+
transcribe_streaming.main()
90+
out, err = capsys.readouterr()
91+
92+
assert re.search(
93+
r'old is the.*quit', out, re.DOTALL | re.I)
94+
assert 'Starting new stream' in caplog.text()

0 commit comments

Comments
 (0)