基本流程
- 环境搭建
- 客户端环境
- 录音模块 pip install pyaudio
- 服务器环境
- flask
- 客户端
- 录音模块
- 硬件设备
- 识别效果跟话筒的关系很大
- 缓冲区
- 发送数据
- 缓冲区数据
- 缓冲区优化
- 接收数据
- 识别结果
- 服务器端
- 接收缓冲区数据
- 调用识别接口
- 传入缓冲区数据
- 返回识别文字
- 发送识别文字给客户端
环境准备
相关包的安装
pip install pygame
SpeechRecognition
playsound
librosa
服务器端
初始目录结构
├── cache
│ └── temp.wav
├── client
│ ├── client.py
│ └── __init__.py
├── decoder
│ ├── create_data_list.sh
│ ├── datalist
│ ├── recognize.py
│ └── wenet -> /home/asr/data/wenet/wenet
├── model
│ ├── 20210815_unified_conformer_exp
│ │ ├── final.pt
│ │ ├── global_cmvn
│ │ ├── train.yaml
│ │ └── words.txt
└── server
├── __init__.py
└── server.py
测试识别后的目录结构
├── cache
│ ├── temp1.wav
│ ├── temp2.wav
│ └── temp.wav
├── client
│ ├── client.py
│ └── __init__.py
├── decoder
│ ├── create_data_list.sh
│ ├── datalist
│ │ ├── temp
│ │ ├── temp1
│ │ └── temp2
│ ├── recognize.py
│ └── wenet -> /home/asr/data/wenet/wenet
├── model
│ ├── 20210618_u2pp_conformer_exp.tar.gz
│ ├── 20210815_unified_conformer_exp
│ │ ├── final.pt
│ │ ├── global_cmvn
│ │ ├── train.yaml
│ │ └── words.txt
│ └── 20210815_unified_conformer_exp.tar.gz
└── server
├── __init__.py
└── server.py
pip install flask
from flask import Flask
app = Flask(__name__)
@app.route("/")
def getdata():
save_wav(data,save_path)
if __name__ =="__main__":
app.run()
def save_wav(frames, save_path):
wf = wave.open(save_path, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(2)
wf.setframerate(SAMPALE_RATE)
wf.writeframes(b''.join(frames))
wf.close()
print('\033[93m' + "已录入缓冲区" + '\033[0m')
在服务启动时加载模型
[En]
Load the model when the service is started
当有数据传入时,调用标识接口
[En]
Call the identification interface when there is data coming in
from flask import Flask
from recognize import recognize
app = Flask(__name__)
model = recognize()
@app.route("/")
def run_recognize():
result = model.get_recognize()
return result
if __name__ =="__main__":
app.run()
wenet识别需要这个文件,内部读取这个文件的数据,格式必须是如下
{"key":"temp","wav":"/home/sunao/data/StreamAIzimu/cache/temp.wav","txt":""}
#!/usr/bin/bash
root=..
data=${root}/cache/temp.wav
echo "{\"key\":\"temp\",\"wav\":\"${data}\",\"txt\":\"\"}" > online_data.list
修改wenet的识别文件recognize.py改为只加载一次模型,并且取消默认的bash脚本传入参数的方式
recognize.py
from __future__ import print_function
import argparse
import copy
import logging
import os
import sys
import time
import torch
import yaml
from torch.utils.data import DataLoader
from wenet.dataset.dataset import Dataset
from wenet.transformer.asr_model import init_asr_model
from wenet.utils.checkpoint import load_checkpoint
from wenet.utils.file_utils import read_symbol_table, read_non_lang_symbols
from wenet.utils.config import override_config
class recognize():
def __init__(self, ):
self.root_path = os.pardir
self.batch_size = 1
self.beam_size = 10
self.bpe_model = None
self.checkpoint = '../model/20210815_unified_conformer_exp/final.pt'
self.config = '../model/20210815_unified_conformer_exp/train.yaml'
self.ctc_weight = 0.5
self.data_type = 'raw'
self.decoding_chunk_size = -1
self.dict = '../model/20210815_unified_conformer_exp/words.txt'
self.gpu = -1
self.mode = 'attention_rescoring'
self.non_lang_syms = None
self.num_decoding_left_chunks = -1
self.override_config = []
self.penalty = 0.0
self.result_file = 'online_text'
self.reverse_weight = 0.0
self.simulate_streaming = False,
self.test_data = 'online_data.list'
self.use_cuda = self.gpu >= 0 and torch.cuda.is_available()
self.device = torch.device('cuda' if False else 'cpu')
self.load_configs()
self.test_data_conf()
self.loadmodel()
def load_configs(self):
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s')
os.environ['CUDA_VISIBLE_DEVICES'] = str(self.gpu)
if self.mode in ['ctc_prefix_beam_search', 'attention_rescoring'
] and self.batch_size > 1:
logging.fatal(
'decoding mode {} must be running with batch_size == 1'.format(
self.mode))
sys.exit(1)
with open(self.config, 'r') as fin:
self.configs = yaml.load(fin, Loader=yaml.FullLoader)
if len(self.override_config) > 0:
self.configs = override_config(self.configs, self.override_config)
self.symbol_table = read_symbol_table(self.dict)
def loadmodel(self):
model = init_asr_model(self.configs)
self.char_dict = {v: k for k, v in self.symbol_table.items()}
self.eos = len(self.char_dict) - 1
load_checkpoint(model, self.checkpoint)
self.model = model.to(self.device)
self.model.eval()
def test_data_conf(self):
'''
测试数据配置
'''
self.test_conf = copy.deepcopy(self.configs['dataset_conf'])
self.test_conf['filter_conf']['max_length'] = 102400
self.test_conf['filter_conf']['min_length'] = 0
self.test_conf['filter_conf']['token_max_length'] = 102400
self.test_conf['filter_conf']['token_min_length'] = 0
self.test_conf['filter_conf']['max_output_input_ratio'] = 102400
self.test_conf['filter_conf']['min_output_input_ratio'] = 0
self.test_conf['speed_perturb'] = False
self.test_conf['spec_aug'] = False
self.test_conf['shuffle'] = False
self.test_conf['sort'] = False
if 'fbank_conf' in self.test_conf:
self.test_conf['fbank_conf']['dither'] = 0.0
elif 'mfcc_conf' in self.test_conf:
self.test_conf['mfcc_conf']['dither'] = 0.0
self.test_conf['batch_conf']['batch_type'] = "static"
self.test_conf['batch_conf']['batch_size'] = self.batch_size
self.non_lang_syms = read_non_lang_symbols(self.non_lang_syms)
def get_test_data_loader(self):
test_dataset = Dataset(self.data_type,
self.test_data,
self.symbol_table,
self.test_conf,
self.bpe_model,
self.non_lang_syms,
partition=False)
return DataLoader(test_dataset, batch_size=None, num_workers=0)
def get_recognize(self):
test_data_loader = self.get_test_data_loader()
with torch.no_grad():
for batch_idx, batch in enumerate(test_data_loader):
keys, feats, target, feats_lengths, target_lengths = batch
feats = feats.to(self.device)
feats_lengths = feats_lengths.to(self.device)
assert (feats.size(0) == 1)
if self.mode == 'attention':
hyps, _ = self.model.recognize(
feats,
feats_lengths,
beam_size=self.beam_size,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
simulate_streaming=self.simulate_streaming)
hyps = [hyp.tolist() for hyp in hyps]
elif self.mode == 'ctc_greedy_search':
hyps, _ = self.model.ctc_greedy_search(
feats,
feats_lengths,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
simulate_streaming=self.simulate_streaming)
elif self.mode == 'ctc_prefix_beam_search':
assert (feats.size(0) == 1)
hyp, _ = self.model.ctc_prefix_beam_search(
feats,
feats_lengths,
self.beam_size,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
simulate_streaming=self.simulate_streaming)
hyps = [hyp]
elif self.mode == 'attention_rescoring':
assert (feats.size(0) == 1)
hyp, _ = self.model.attention_rescoring(
feats,
feats_lengths,
self.beam_size,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
ctc_weight=self.ctc_weight,
simulate_streaming=self.simulate_streaming,
reverse_weight=self.reverse_weight)
hyps = [hyp]
content = ''
for w in hyps[0]:
if w == self.eos:
break
content += self.char_dict[w]
return content
if __name__ == '__main__':
recog = recognize()
result = recog.get_recognize()
print(result)
这样就可以支持多路翻译
from __future__ import print_function
import argparse
import copy
import logging
import os
import sys
import time
import torch
import yaml
from torch.utils.data import DataLoader
from wenet.dataset.dataset import Dataset
from wenet.transformer.asr_model import init_asr_model
from wenet.utils.checkpoint import load_checkpoint
from wenet.utils.file_utils import read_symbol_table, read_non_lang_symbols
from wenet.utils.config import override_config
class recognize():
def __init__(self, ):
self.root_path = os.pardir
self.batch_size = 1
self.beam_size = 10
self.bpe_model = None
self.checkpoint = '../model/20210815_unified_conformer_exp/final.pt'
self.config = '../model/20210815_unified_conformer_exp/train.yaml'
self.ctc_weight = 0.5
self.data_type = 'raw'
self.decoding_chunk_size = -1
self.dict = '../model/20210815_unified_conformer_exp/words.txt'
self.gpu = -1
self.mode = 'attention_rescoring'
self.non_lang_syms = None
self.num_decoding_left_chunks = -1
self.override_config = []
self.penalty = 0.0
self.result_file = 'online_text'
self.reverse_weight = 0.0
self.simulate_streaming = False,
self.test_data = 'online_data.list'
self.use_cuda = self.gpu >= 0 and torch.cuda.is_available()
self.device = torch.device('cuda' if False else 'cpu')
self.load_configs()
self.test_data_conf()
self.loadmodel()
def load_configs(self):
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s')
os.environ['CUDA_VISIBLE_DEVICES'] = str(self.gpu)
if self.mode in ['ctc_prefix_beam_search', 'attention_rescoring'
] and self.batch_size > 1:
logging.fatal(
'decoding mode {} must be running with batch_size == 1'.format(
self.mode))
sys.exit(1)
with open(self.config, 'r') as fin:
self.configs = yaml.load(fin, Loader=yaml.FullLoader)
if len(self.override_config) > 0:
self.configs = override_config(self.configs, self.override_config)
self.symbol_table = read_symbol_table(self.dict)
def loadmodel(self):
model = init_asr_model(self.configs)
self.char_dict = {v: k for k, v in self.symbol_table.items()}
self.eos = len(self.char_dict) - 1
load_checkpoint(model, self.checkpoint)
self.model = model.to(self.device)
self.model.eval()
def test_data_conf(self):
'''
测试数据配置
'''
self.test_conf = copy.deepcopy(self.configs['dataset_conf'])
self.test_conf['filter_conf']['max_length'] = 102400
self.test_conf['filter_conf']['min_length'] = 0
self.test_conf['filter_conf']['token_max_length'] = 102400
self.test_conf['filter_conf']['token_min_length'] = 0
self.test_conf['filter_conf']['max_output_input_ratio'] = 102400
self.test_conf['filter_conf']['min_output_input_ratio'] = 0
self.test_conf['speed_perturb'] = False
self.test_conf['spec_aug'] = False
self.test_conf['shuffle'] = False
self.test_conf['sort'] = False
if 'fbank_conf' in self.test_conf:
self.test_conf['fbank_conf']['dither'] = 0.0
elif 'mfcc_conf' in self.test_conf:
self.test_conf['mfcc_conf']['dither'] = 0.0
self.test_conf['batch_conf']['batch_type'] = "static"
self.test_conf['batch_conf']['batch_size'] = self.batch_size
self.non_lang_syms = read_non_lang_symbols(self.non_lang_syms)
def get_test_data_loader(self,path):
self.test_data=path
test_dataset = Dataset(self.data_type,
self.test_data,
self.symbol_table,
self.test_conf,
self.bpe_model,
self.non_lang_syms,
partition=False)
return DataLoader(test_dataset, batch_size=None, num_workers=0)
def create_data_list(self,path):
file_name = path.split("/")[-1].split(".")[0]
filepath = "./datalist/"+file_name
if not os.path.exists(filepath):
with open(filepath,'w',encoding="utf-8") as file:
file.write('{"key":"%s","wav":"/home/sunao/data/StreamAIzimu/cache/%s.wav","txt":""}'%(file_name,file_name))
return filepath
def get_recognize(self , path):
path = self.create_data_list(path)
test_data_loader = self.get_test_data_loader(path)
with torch.no_grad():
for batch_idx, batch in enumerate(test_data_loader):
keys, feats, target, feats_lengths, target_lengths = batch
feats = feats.to(self.device)
feats_lengths = feats_lengths.to(self.device)
assert (feats.size(0) == 1)
if self.mode == 'attention':
hyps, _ = self.model.recognize(
feats,
feats_lengths,
beam_size=self.beam_size,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
simulate_streaming=self.simulate_streaming)
hyps = [hyp.tolist() for hyp in hyps]
elif self.mode == 'ctc_greedy_search':
hyps, _ = self.model.ctc_greedy_search(
feats,
feats_lengths,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
simulate_streaming=self.simulate_streaming)
elif self.mode == 'ctc_prefix_beam_search':
assert (feats.size(0) == 1)
hyp, _ = self.model.ctc_prefix_beam_search(
feats,
feats_lengths,
self.beam_size,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
simulate_streaming=self.simulate_streaming)
hyps = [hyp]
elif self.mode == 'attention_rescoring':
assert (feats.size(0) == 1)
hyp, _ = self.model.attention_rescoring(
feats,
feats_lengths,
self.beam_size,
decoding_chunk_size=self.decoding_chunk_size,
num_decoding_left_chunks=self.num_decoding_left_chunks,
ctc_weight=self.ctc_weight,
simulate_streaming=self.simulate_streaming,
reverse_weight=self.reverse_weight)
hyps = [hyp]
content = ''
for w in hyps[0]:
if w == self.eos:
break
content += self.char_dict[w]
return content
if __name__ == '__main__':
recog = recognize()
result1 = recog.get_recognize("../cache/temp.wav")
result2 = recog.get_recognize("../cache/temp1.wav")
result3 = recog.get_recognize("../cache/temp2.wav")
print(result1)
print(result2)
print(result3)
客户端
- 首先判断是否有人在讲话,即是否有数据
[En]
first of all, judge whether someone is talking, that is, whether there is data*
- 存入缓冲区,送入识别模块
- 识别返回结果,存入字幕
- 字幕的长度
- 需要判断此处是否是句子结尾
- 是的话,断句,vad
- 不是的话,超过20个字进行断句
- 判断静音时间
- 如果静音时间过长,则
其实,这里并不需要保存音频,只是为了测试录音是否正常工作。
[En]
In fact, there is no need to save the audio here, just to test whether the recording is working properly.
import pyaudio
import wave
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPALE_RATE = 44100
RECORD_SECONDS = 4
temp_save_path = "Audio/temp.wav"
p = pyaudio.PyAudio()
def save_wav(frames, save_path):
wf = wave.open(save_path, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(2)
wf.setframerate(SAMPALE_RATE)
wf.writeframes(b''.join(frames))
wf.close()
print('\033[93m' + "已录入缓冲区" + '\033[0m')
def recording(save_path):
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=SAMPALE_RATE,
input=True,
frames_per_buffer=CHUNK)
print('\033[93m' + "recording" + '\033[0m')
frames = []
max_size = 16*4
while 1:
data = stream.read(CHUNK)
frames.append(data)
if len(frames) == max_size:
save_wav(frames, save_path)
frames = []
break
recording(temp_save_path)
基于短时能量和短时过零率阈值的端点检测
[En]
Endpoint detection based on short-time energy and threshold of short-time zero-crossing rate
vad.py
import numpy as np
import pyaudio
SUCCESS = 0
FAIL = 1
audio2 = ""
stream2 = ""
def ZCR(curFrame):
tmp1 = curFrame[:-1]
tmp2 = curFrame[1:]
sings = (tmp1 * tmp2 0)
diffs = (tmp1 - tmp2) > 0.02
zcr = np.sum(sings * diffs)
return zcr
def STE(curFrame):
amp = np.sum(np.abs(curFrame))
return amp
class Vad(object):
def __init__(self,CHUNK=1024):
self.amp1 = 15
self.amp2 = 1
self.zcr1 = 2
self.zcr2 = 1
self.maxsilence = 45
self.minlen = 40
self.max_en = 20000
self.status = 0
self.count = 0
self.silence = 0
self.frame_len = CHUNK
self.frame_inc = CHUNK / 2
self.cur_status = 0
def check_ontime(self, cache_frame):
wave_data = np.frombuffer(cache_frame, dtype=np.int16)
wave_data = wave_data * 1.0 / self.max_en
data = wave_data[np.arange(0, self.frame_len)]
zcr = ZCR(data)
amp = STE(data) ** 2
status = self.speech_status(amp, zcr)
return status
def speech_status(self, amp, zcr):
status = 0
if self.cur_status in [0, 1]:
if amp > self.amp1 or zcr > self.zcr1:
status = 2
self.silence = 0
self.count += 1
elif amp > self.amp2 or zcr > self.zcr2:
status = 2
self.count += 1
else:
status = 0
self.count = 0
self.count = 0
elif self.cur_status == 2:
if amp > self.amp2 or zcr > self.zcr2:
self.count += 1
status = 2
else:
self.silence += 1
if self.silence < self.maxsilence:
self.count += 1
status = 2
elif self.count < self.minlen:
status = 0
self.silence = 0
self.count = 0
else:
status = 3
self.silence = 0
self.count = 0
return status
目前,它简化了功能,实现了应答时的语音识别效果。由于没有很好的方法控制控制台输出,实时字幕的显示很差。
[En]
At present, it simplifies the function and realizes the effect of speech recognition when answering. Because there is no good way to control the console output, the display of real-time subtitles is very poor.
基于缓存的语音识别不能始终捕获说话人的尾部,因此需要对尾部进行附加,以达到识别整个语音的效果。
[En]
The cache-based speech recognition can not always capture the tail of the speaker, so it is necessary to append the tail to achieve the effect of recognizing the whole speech.
import time
import pyaudio
import wave
from decoder.recognize import Recognize
import numpy as np
from vad import Vad
class RecognizeService():
def __init__(self):
self.CHUNK = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.SAMPALE_RATE = 16000
self.temp_save_path = "../cache/temp1.wav"
self.p = pyaudio.PyAudio()
self.model = Recognize()
self.stream = self.p.open(format=self.FORMAT,
channels=self.CHANNELS,
rate=self.SAMPALE_RATE,
input=True,
frames_per_buffer=self.CHUNK,
input_device_index=0)
self.v = Vad(self.CHUNK)
def save_wav(self,frames, save_path):
wf = wave.open(save_path, 'wb')
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.SAMPALE_RATE)
wf.writeframes(b''.join(frames))
wf.close()
def recording(self,save_path):
print('\033[93m' + "recording" + '\033[0m')
frames = []
max_size = 5
long_frames = []
next=""
num=0
is_speak=False
result=""
while True:
stream_data = self.stream.read(self.CHUNK,exception_on_overflow=False)
status = self.v.check_ontime(stream_data)
if status==2:
is_speak=True
wave_data = np.frombuffer(stream_data, dtype=np.int16)
frames.append(wave_data)
if len(frames) >= max_size:
long_frames.extend(frames)
if len(long_frames) > max_size * 10:
long_frames = long_frames[-max_size * 10:]
self.save_wav(long_frames, self.temp_save_path)
result = self.model.get_recognize(self.temp_save_path)
frames = []
if next == result:
continue
next = result
if status==0:
num += 1
if num == 10:
if is_speak:
if len(frames)>0 and len(long_frames)>0:
long_frames.extend(frames)
self.save_wav(long_frames, self.temp_save_path)
result = self.model.get_recognize(self.temp_save_path)
if result != "":
print(result)
num = 0
long_frames = []
frames = []
is_speak=False
result=""
if __name__ == '__main__':
service = RecognizeService()
service.recording(service.temp_save_path)
Original: https://blog.csdn.net/ALL_BYA/article/details/124039408
Author: 语音不识别
Title: 在线实时语音识别实现【完善中-本地测试已完成,只差服务器功能】
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/512787/
转载文章受原作者版权保护。转载请注明原作者出处!