Skip to main content

Python

This sample demonstrates how to use the Whisperly Live project with WebSocket in Python.


client.py

This is a basic WebSocket client implementation that connects to the transcription server, sends an initial configuration message, and processes incoming messages.

import json
import os
import threading
import time
import uuid
import wave

import ffmpeg
import numpy as np
import pyaudio
import websocket
from faster_whisper import utils


class Client:
INSTANCES = {}
END_OF_AUDIO = "END_OF_AUDIO"

def __init__(
self,
host=None,
lang=None,
translate=False,
model="small",
srt_file_path="output.srt",
use_vad=True,
sample_rate=16000,
max_delay=7,
duration_minutes=1,
headers=None
):
self.recording = False
self.task = "transcribe"
self.uid = str(uuid.uuid4())
self.waiting = False
self.last_response_received = None
self.disconnect_if_no_response_for = 15
self.language = lang
self.model = model
self.server_error = False
self.srt_file_path = srt_file_path
self.use_vad = use_vad
self.last_segment = None
self.last_received_segment = None
self.sample_rate = sample_rate
self.max_delay = max_delay
self.duration_minutes = duration_minutes
self.headers = headers

if translate:
self.task = "translate"

self.timestamp_offset = 0.0
self.audio_bytes = None

if host is not None:
socket_url = f"wss://{host}/ws"
self.client_socket = websocket.WebSocketApp(
socket_url,
header=headers,
on_open=lambda ws: self.on_open(ws),
on_message=lambda ws, message: self.on_message(ws, message),
on_error=lambda ws, error: self.on_error(ws, error),
on_close=lambda ws, close_status_code, close_msg: self.on_close(
ws, close_status_code, close_msg
),
)
else:
print("[ERROR]: No host or port specified.")
return

Client.INSTANCES[self.uid] = self

self.ws_thread = threading.Thread(target=self.client_socket.run_forever)
self.ws_thread.setDaemon(True)
self.ws_thread.start()

self.transcript = []
print("[INFO]: * recording")

def handle_status_messages(self, message_data):
status = message_data["status"]
if status == "WAIT":
self.waiting = True
print(f"[INFO]: Server is full. Estimated wait time {round(message_data['message'])} minutes.")
elif status == "ERROR":
print(f"Message from Server: {message_data['message']}")
self.server_error = True
elif status == "WARNING":
print(f"Message from Server: {message_data['message']}")

def process_segments(self, segments):
text = []
for i, seg in enumerate(segments):
if not text or text[-1] != seg["text"]:
text.append(seg["text"])
if i == len(segments) - 1:
self.last_segment = seg
elif (self.server_backend == "faster_whisper" and
(not self.transcript or
float(seg['start']) >= float(self.transcript[-1]['end']))):
self.transcript.append(seg)
if self.last_received_segment is None or self.last_received_segment != segments[-1]["text"]:
self.last_response_received = time.time()
self.last_received_segment = segments[-1]["text"]

# Truncate to last 3 entries for brevity.
#print(segments)
text = text[-3:]
#utils.clear_screen()
#utils.print_transcript(text)

def on_message(self, ws, message):
message = json.loads(message)

if self.uid != message.get("uid"):
print("[ERROR]: invalid client uid")
return

if "status" in message.keys():
self.handle_status_messages(message)
return

if "message" in message.keys() and message["message"] == "DISCONNECT":
print("[INFO]: Server disconnected due to overtime.")
self.recording = False

if "message" in message.keys() and message["message"] == "SERVER_READY":
self.last_response_received = time.time()
self.recording = True
self.server_backend = message["backend"]
print(f"[INFO]: Server Running with backend {self.server_backend}")
return

if "language" in message.keys():
self.language = message.get("language")
lang_prob = message.get("language_prob")
print(
f"[INFO]: Server detected language {self.language} with probability {lang_prob}"
)
return

if "segments" in message.keys():
self.process_segments(message["segments"])

# Open this if you want to show all partial and final segments with prob
# print(message)
for msg in message.get('segments', []):
print(msg.get('type'), '->', msg.get('text'))
# if msg.get('type') == 'final':
# print(msg.get('text'))

def on_error(self, ws, error):
print(f"[ERROR] WebSocket Error: {error}")
self.server_error = True
self.error_message = error

def on_close(self, ws, close_status_code, close_msg):
print(f"[INFO]: Websocket connection closed: {close_status_code}: {close_msg}")
self.recording = False
self.waiting = False

def on_open(self, ws):
print("[INFO]: Opened connection")
ws.send(
json.dumps(
{
"uid": self.uid,
"language": self.language,
"task": self.task,
"model": self.model,
"use_vad": self.use_vad,
"sample_rate": self.sample_rate,
"max_delay": self.max_delay,
"duration_minutes": self.duration_minutes
}
)
)

def send_packet_to_server(self, message):
try:
self.client_socket.send(message, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(e)

def close_websocket(self):
try:
self.client_socket.close()
except Exception as e:
print("[ERROR]: Error closing WebSocket:", e)

try:
self.ws_thread.join()
except Exception as e:
print("[ERROR:] Error joining WebSocket thread:", e)

def get_client_socket(self):
return self.client_socket

def write_srt_file(self, output_path="output.srt"):
if self.server_backend == "faster_whisper":
if (self.last_segment):
self.transcript.append(self.last_segment)

def wait_before_disconnect(self):
assert self.last_response_received
while time.time() - self.last_response_received < self.disconnect_if_no_response_for:
continue


class TranscriptionTeeClient:
def __init__(self, clients):
self.clients = clients
if not self.clients:
raise Exception("At least one client is required.")
self.chunk = 4096
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 16000
self.record_seconds = 60000
self.frames = b""
self.p = pyaudio.PyAudio()
try:
self.stream = self.p.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk,
)
except OSError as error:
print(f"[WARN]: Unable to access microphone. {error}")
self.stream = None

def __call__(self, audio=None, hls_url=None, save_file=None):
print("[INFO]: Waiting for server ready ...")
for client in self.clients:
while not client.recording:
if client.waiting or client.server_error:
self.close_all_clients()
return

print("[INFO]: Server Ready!")
if hls_url is not None:
self.process_hls_stream(hls_url, save_file)
elif audio is not None:
resampled_file = utils.resample(audio)
self.play_file(resampled_file)
else:
self.record()

def close_all_clients(self):
for client in self.clients:
client.close_websocket()

def write_all_clients_srt(self):
for client in self.clients:
client.write_srt_file(client.srt_file_path)

def multicast_packet(self, packet, unconditional=False):
for client in self.clients:
if (unconditional or client.recording):
client.send_packet_to_server(packet)

def play_file(self, filename):
with wave.open(filename, "rb") as wavfile:
try:
while any(client.recording for client in self.clients):
data = wavfile.readframes(self.chunk)
if data == b"":
break

audio_array = self.bytes_to_float_array(data)
self.multicast_packet(audio_array.tobytes())
# self.stream.write(data)

wavfile.close()

for client in self.clients:
client.wait_before_disconnect()
self.multicast_packet(Client.END_OF_AUDIO.encode('utf-8'), True)
self.write_all_clients_srt()
# self.stream.close()
self.close_all_clients()

except KeyboardInterrupt:
wavfile.close()
# self.stream.stop_stream()
# self.stream.close()
self.p.terminate()
self.close_all_clients()
self.write_all_clients_srt()
print("[INFO]: Keyboard interrupt.")

def process_hls_stream(self, hls_url, save_file):
print("[INFO]: Connecting to HLS stream...")
process = None

try:
if save_file is None:
process = (
ffmpeg
.input(hls_url, threads=0)
.output('-', format='s16le', acodec='pcm_s16le', ac=1, ar=self.rate)
.run_async(pipe_stdout=True, pipe_stderr=True)
)
else:
input = ffmpeg.input(hls_url, threads=0)
output_file = input.output(save_file, acodec='copy', vcodec='copy').global_args('-loglevel', 'quiet')
output_std = input.output('-', format='s16le', acodec='pcm_s16le', ac=1, ar=self.rate)
process = (
ffmpeg.merge_outputs(output_file, output_std)
.run_async(pipe_stdout=True, pipe_stderr=True)
)

while True:
in_bytes = process.stdout.read(self.chunk * 2) # 2 bytes per sample
if not in_bytes:
break
audio_array = self.bytes_to_float_array(in_bytes)
self.multicast_packet(audio_array.tobytes())

except Exception as e:
print(f"[ERROR]: Failed to connect to HLS stream: {e}")
finally:
self.close_all_clients()
self.write_all_clients_srt()
if process:
process.kill()

print("[INFO]: HLS stream processing finished.")

def record(self, out_file="output_recording.wav"):
n_audio_file = 0
if not os.path.exists("chunks"):
os.makedirs("chunks", exist_ok=True)
try:
for _ in range(0, int(self.rate / self.chunk * self.record_seconds)):
if not any(client.recording for client in self.clients):
break
data = self.stream.read(self.chunk, exception_on_overflow=False)
self.frames += data

audio_array = self.bytes_to_float_array(data)

self.multicast_packet(audio_array.tobytes())

if len(self.frames) > 60 * self.rate:
t = threading.Thread(
target=self.write_audio_frames_to_file,
args=(
self.frames[:],
f"chunks/{n_audio_file}.wav",
),
)
t.start()
n_audio_file += 1
self.frames = b""
self.write_all_clients_srt()

except KeyboardInterrupt:
if len(self.frames):
self.write_audio_frames_to_file(
self.frames[:], f"chunks/{n_audio_file}.wav"
)
n_audio_file += 1
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
self.close_all_clients()

self.write_output_recording(n_audio_file, out_file)
self.write_all_clients_srt()

def write_audio_frames_to_file(self, frames, file_name):
with wave.open(file_name, "wb") as wavfile:
wavfile: wave.Wave_write
wavfile.setnchannels(self.channels)
wavfile.setsampwidth(2)
wavfile.setframerate(self.rate)
wavfile.writeframes(frames)

def write_output_recording(self, n_audio_file, out_file):
input_files = [
f"chunks/{i}.wav"
for i in range(n_audio_file)
if os.path.exists(f"chunks/{i}.wav")
]
with wave.open(out_file, "wb") as wavfile:
wavfile: wave.Wave_write
wavfile.setnchannels(self.channels)
wavfile.setsampwidth(2)
wavfile.setframerate(self.rate)
for in_file in input_files:
with wave.open(in_file, "rb") as wav_in:
while True:
data = wav_in.readframes(self.chunk)
if data == b"":
break
wavfile.writeframes(data)
os.remove(in_file)
wavfile.close()

@staticmethod
def bytes_to_float_array(audio_bytes):
raw_data = np.frombuffer(buffer=audio_bytes, dtype=np.int16)
return raw_data.astype(np.float32) / 32768.0


class TranscriptionClient(TranscriptionTeeClient):
def __init__(self, host, lang=None, translate=False, model="small", use_vad=True, sample_rate=16000, max_delay=7, duration_minutes=1, headers=None):
self.client = Client(host, lang, translate, model, srt_file_path="output.srt", use_vad=use_vad, sample_rate=sample_rate, max_delay=max_delay, duration_minutes=duration_minutes, headers=headers)
TranscriptionTeeClient.__init__(self, [self.client])

run_client.py

This is a basic script to run the WebSocket client with the specified server host.

from client import TranscriptionClient


def main(lang="en", translate=False, model="large-v3-turbo", use_vad=False, sample_rate=16000):
live_headers = {"Authorization": f"""Bearer <TOKEN>"""}

client = TranscriptionClient(
"live.recordly.ai", # Server host address (change this based on your environment)
lang=lang, # Language for transcription (e.g., "en" for English, "tr" for Turkish)
translate=translate, # Boolean flag: if True, the task will perform translation instead of transcription
model=model, # Specifies the transcription model to be used (e.g., "small", "large-v3-turbo")
use_vad=use_vad, # Boolean flag indicating whether to use Voice Activity Detection (VAD)
sample_rate=sample_rate, # Audio sample rate (e.g., 16000 Hz)
max_delay=7, # Maximum allowed delay (in seconds) for processing audio data
duration_minutes=1, # Duration (in minutes) for the session or recording
headers=live_headers, # HTTP headers to include, typically containing authentication info like an API key
)

client()


if __name__ == "__main__":
LANGUAGE = "tr"
TRANSLATE = False
MODEL = "large-v3-turbo"
USE_VAD = True

main(LANGUAGE, TRANSLATE, MODEL, USE_VAD)

Usage

    python run_client.py