在线实时语音识别实现【完善中-本地测试已完成,只差服务器功能】

基本流程

  • 环境搭建
  • 客户端环境
    • 录音模块 pip install pyaudio
  • 服务器环境
    • flask
  • 客户端
  • 录音模块
    • 硬件设备
    • 识别效果跟话筒的关系很大
    • 缓冲区
  • 发送数据
    • 缓冲区数据
    • 缓冲区优化
  • 接收数据
    • 识别结果
  • 服务器端
  • 接收缓冲区数据
  • 调用识别接口
    • 传入缓冲区数据
    • 返回识别文字
  • 发送识别文字给客户端

环境准备

相关包的安装
pip install pygame
SpeechRecognition
playsound
librosa

服务器端

初始目录结构

├── cache
│   └── temp.wav
├── client
│   ├── client.py
│   └── __init__.py
├── decoder
│   ├── create_data_list.sh
│   ├── datalist
│   ├── recognize.py
│   └── wenet -> /home/asr/data/wenet/wenet
├── model
│   ├── 20210815_unified_conformer_exp
│   │   ├── final.pt
│   │   ├── global_cmvn
│   │   ├── train.yaml
│   │   └── words.txt
└── server
    ├── __init__.py
    └── server.py

测试识别后的目录结构

├── cache
│   ├── temp1.wav
│   ├── temp2.wav
│   └── temp.wav
├── client
│   ├── client.py
│   └── __init__.py
├── decoder
│   ├── create_data_list.sh
│   ├── datalist
│   │   ├── temp
│   │   ├── temp1
│   │   └── temp2
│   ├── recognize.py
│   └── wenet -> /home/asr/data/wenet/wenet
├── model
│   ├── 20210618_u2pp_conformer_exp.tar.gz
│   ├── 20210815_unified_conformer_exp
│   │   ├── final.pt
│   │   ├── global_cmvn
│   │   ├── train.yaml
│   │   └── words.txt
│   └── 20210815_unified_conformer_exp.tar.gz
└── server
    ├── __init__.py
    └── server.py

pip install flask
from flask import Flask

app = Flask(__name__)

@app.route("/")
def getdata():

    save_wav(data,save_path)

if __name__ =="__main__":
    app.run()


def save_wav(frames, save_path):
    wf = wave.open(save_path, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(2)
    wf.setframerate(SAMPALE_RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
    print('\033[93m' + "已录入缓冲区" + '\033[0m')

在服务启动时加载模型

[En]

Load the model when the service is started

当有数据传入时,调用标识接口

[En]

Call the identification interface when there is data coming in

from flask import Flask
from recognize import recognize

app = Flask(__name__)

model = recognize()
@app.route("/")
def run_recognize():

    result = model.get_recognize()
    return result

if __name__ =="__main__":
    app.run()

wenet识别需要这个文件,内部读取这个文件的数据,格式必须是如下

{"key":"temp","wav":"/home/sunao/data/StreamAIzimu/cache/temp.wav","txt":""}

#!/usr/bin/bash
root=..

data=${root}/cache/temp.wav

echo "{\"key\":\"temp\",\"wav\":\"${data}\",\"txt\":\"\"}" > online_data.list

修改wenet的识别文件recognize.py改为只加载一次模型,并且取消默认的bash脚本传入参数的方式

recognize.py


from __future__ import print_function

import argparse
import copy
import logging
import os
import sys
import time
import torch
import yaml
from torch.utils.data import DataLoader

from wenet.dataset.dataset import Dataset
from wenet.transformer.asr_model import init_asr_model
from wenet.utils.checkpoint import load_checkpoint
from wenet.utils.file_utils import read_symbol_table, read_non_lang_symbols
from wenet.utils.config import override_config

class recognize():
    def __init__(self, ):

        self.root_path = os.pardir
        self.batch_size = 1
        self.beam_size = 10
        self.bpe_model = None
        self.checkpoint = '../model/20210815_unified_conformer_exp/final.pt'
        self.config = '../model/20210815_unified_conformer_exp/train.yaml'
        self.ctc_weight = 0.5
        self.data_type = 'raw'
        self.decoding_chunk_size = -1
        self.dict = '../model/20210815_unified_conformer_exp/words.txt'
        self.gpu = -1
        self.mode = 'attention_rescoring'
        self.non_lang_syms = None
        self.num_decoding_left_chunks = -1
        self.override_config = []
        self.penalty = 0.0
        self.result_file = 'online_text'
        self.reverse_weight = 0.0
        self.simulate_streaming = False,
        self.test_data = 'online_data.list'

        self.use_cuda = self.gpu >= 0 and torch.cuda.is_available()

        self.device = torch.device('cuda' if False else 'cpu')
        self.load_configs()
        self.test_data_conf()
        self.loadmodel()

    def load_configs(self):
        logging.basicConfig(level=logging.DEBUG,
                            format='%(asctime)s %(levelname)s %(message)s')
        os.environ['CUDA_VISIBLE_DEVICES'] = str(self.gpu)

        if self.mode in ['ctc_prefix_beam_search', 'attention_rescoring'
                              ] and self.batch_size > 1:
            logging.fatal(
                'decoding mode {} must be running with batch_size == 1'.format(
                    self.mode))
            sys.exit(1)
        with open(self.config, 'r') as fin:
            self.configs = yaml.load(fin, Loader=yaml.FullLoader)
        if len(self.override_config) > 0:
            self.configs = override_config(self.configs, self.override_config)

        self.symbol_table = read_symbol_table(self.dict)

    def loadmodel(self):

        model = init_asr_model(self.configs)

        self.char_dict = {v: k for k, v in self.symbol_table.items()}
        self.eos = len(self.char_dict) - 1

        load_checkpoint(model, self.checkpoint)
        self.model = model.to(self.device)
        self.model.eval()

    def test_data_conf(self):
        '''
        测试数据配置
        '''
        self.test_conf = copy.deepcopy(self.configs['dataset_conf'])
        self.test_conf['filter_conf']['max_length'] = 102400
        self.test_conf['filter_conf']['min_length'] = 0
        self.test_conf['filter_conf']['token_max_length'] = 102400
        self.test_conf['filter_conf']['token_min_length'] = 0
        self.test_conf['filter_conf']['max_output_input_ratio'] = 102400
        self.test_conf['filter_conf']['min_output_input_ratio'] = 0
        self.test_conf['speed_perturb'] = False
        self.test_conf['spec_aug'] = False
        self.test_conf['shuffle'] = False
        self.test_conf['sort'] = False
        if 'fbank_conf' in self.test_conf:
            self.test_conf['fbank_conf']['dither'] = 0.0
        elif 'mfcc_conf' in self.test_conf:
            self.test_conf['mfcc_conf']['dither'] = 0.0
        self.test_conf['batch_conf']['batch_type'] = "static"
        self.test_conf['batch_conf']['batch_size'] = self.batch_size
        self.non_lang_syms = read_non_lang_symbols(self.non_lang_syms)

    def get_test_data_loader(self):
        test_dataset = Dataset(self.data_type,
                               self.test_data,
                               self.symbol_table,
                               self.test_conf,
                               self.bpe_model,
                               self.non_lang_syms,
                               partition=False)
        return DataLoader(test_dataset, batch_size=None, num_workers=0)

    def get_recognize(self):
        test_data_loader = self.get_test_data_loader()
        with torch.no_grad():
            for batch_idx, batch in enumerate(test_data_loader):
                keys, feats, target, feats_lengths, target_lengths = batch
                feats = feats.to(self.device)
                feats_lengths = feats_lengths.to(self.device)
                assert (feats.size(0) == 1)
                if self.mode == 'attention':
                    hyps, _ = self.model.recognize(
                        feats,
                        feats_lengths,
                        beam_size=self.beam_size,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        simulate_streaming=self.simulate_streaming)
                    hyps = [hyp.tolist() for hyp in hyps]
                elif self.mode == 'ctc_greedy_search':
                    hyps, _ = self.model.ctc_greedy_search(
                        feats,
                        feats_lengths,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        simulate_streaming=self.simulate_streaming)

                elif self.mode == 'ctc_prefix_beam_search':
                    assert (feats.size(0) == 1)
                    hyp, _ = self.model.ctc_prefix_beam_search(
                        feats,
                        feats_lengths,
                        self.beam_size,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        simulate_streaming=self.simulate_streaming)
                    hyps = [hyp]
                elif self.mode == 'attention_rescoring':
                    assert (feats.size(0) == 1)
                    hyp, _ = self.model.attention_rescoring(
                        feats,
                        feats_lengths,
                        self.beam_size,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        ctc_weight=self.ctc_weight,
                        simulate_streaming=self.simulate_streaming,
                        reverse_weight=self.reverse_weight)
                    hyps = [hyp]

                content = ''
                for w in hyps[0]:
                    if w == self.eos:
                        break
                    content += self.char_dict[w]
                return content

if __name__ == '__main__':

    recog = recognize()

    result = recog.get_recognize()
    print(result)

这样就可以支持多路翻译


from __future__ import print_function

import argparse
import copy
import logging
import os
import sys
import time
import torch
import yaml
from torch.utils.data import DataLoader

from wenet.dataset.dataset import Dataset
from wenet.transformer.asr_model import init_asr_model
from wenet.utils.checkpoint import load_checkpoint
from wenet.utils.file_utils import read_symbol_table, read_non_lang_symbols
from wenet.utils.config import override_config

class recognize():
    def __init__(self, ):

        self.root_path = os.pardir
        self.batch_size = 1
        self.beam_size = 10
        self.bpe_model = None
        self.checkpoint = '../model/20210815_unified_conformer_exp/final.pt'
        self.config = '../model/20210815_unified_conformer_exp/train.yaml'
        self.ctc_weight = 0.5
        self.data_type = 'raw'
        self.decoding_chunk_size = -1
        self.dict = '../model/20210815_unified_conformer_exp/words.txt'
        self.gpu = -1
        self.mode = 'attention_rescoring'
        self.non_lang_syms = None
        self.num_decoding_left_chunks = -1
        self.override_config = []
        self.penalty = 0.0
        self.result_file = 'online_text'
        self.reverse_weight = 0.0
        self.simulate_streaming = False,
        self.test_data = 'online_data.list'

        self.use_cuda = self.gpu >= 0 and torch.cuda.is_available()

        self.device = torch.device('cuda' if False else 'cpu')
        self.load_configs()
        self.test_data_conf()
        self.loadmodel()

    def load_configs(self):
        logging.basicConfig(level=logging.DEBUG,
                            format='%(asctime)s %(levelname)s %(message)s')
        os.environ['CUDA_VISIBLE_DEVICES'] = str(self.gpu)

        if self.mode in ['ctc_prefix_beam_search', 'attention_rescoring'
                              ] and self.batch_size > 1:
            logging.fatal(
                'decoding mode {} must be running with batch_size == 1'.format(
                    self.mode))
            sys.exit(1)
        with open(self.config, 'r') as fin:
            self.configs = yaml.load(fin, Loader=yaml.FullLoader)
        if len(self.override_config) > 0:
            self.configs = override_config(self.configs, self.override_config)

        self.symbol_table = read_symbol_table(self.dict)

    def loadmodel(self):

        model = init_asr_model(self.configs)

        self.char_dict = {v: k for k, v in self.symbol_table.items()}
        self.eos = len(self.char_dict) - 1

        load_checkpoint(model, self.checkpoint)
        self.model = model.to(self.device)
        self.model.eval()

    def test_data_conf(self):
        '''
        测试数据配置
        '''
        self.test_conf = copy.deepcopy(self.configs['dataset_conf'])
        self.test_conf['filter_conf']['max_length'] = 102400
        self.test_conf['filter_conf']['min_length'] = 0
        self.test_conf['filter_conf']['token_max_length'] = 102400
        self.test_conf['filter_conf']['token_min_length'] = 0
        self.test_conf['filter_conf']['max_output_input_ratio'] = 102400
        self.test_conf['filter_conf']['min_output_input_ratio'] = 0
        self.test_conf['speed_perturb'] = False
        self.test_conf['spec_aug'] = False
        self.test_conf['shuffle'] = False
        self.test_conf['sort'] = False
        if 'fbank_conf' in self.test_conf:
            self.test_conf['fbank_conf']['dither'] = 0.0
        elif 'mfcc_conf' in self.test_conf:
            self.test_conf['mfcc_conf']['dither'] = 0.0
        self.test_conf['batch_conf']['batch_type'] = "static"
        self.test_conf['batch_conf']['batch_size'] = self.batch_size
        self.non_lang_syms = read_non_lang_symbols(self.non_lang_syms)

    def get_test_data_loader(self,path):
        self.test_data=path
        test_dataset = Dataset(self.data_type,
                               self.test_data,
                               self.symbol_table,
                               self.test_conf,
                               self.bpe_model,
                               self.non_lang_syms,
                               partition=False)
        return DataLoader(test_dataset, batch_size=None, num_workers=0)

    def create_data_list(self,path):
        file_name = path.split("/")[-1].split(".")[0]
        filepath = "./datalist/"+file_name
        if not os.path.exists(filepath):
            with open(filepath,'w',encoding="utf-8") as file:
                file.write('{"key":"%s","wav":"/home/sunao/data/StreamAIzimu/cache/%s.wav","txt":""}'%(file_name,file_name))
        return filepath

    def get_recognize(self , path):
        path = self.create_data_list(path)
        test_data_loader = self.get_test_data_loader(path)
        with torch.no_grad():
            for batch_idx, batch in enumerate(test_data_loader):
                keys, feats, target, feats_lengths, target_lengths = batch
                feats = feats.to(self.device)
                feats_lengths = feats_lengths.to(self.device)
                assert (feats.size(0) == 1)
                if self.mode == 'attention':
                    hyps, _ = self.model.recognize(
                        feats,
                        feats_lengths,
                        beam_size=self.beam_size,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        simulate_streaming=self.simulate_streaming)
                    hyps = [hyp.tolist() for hyp in hyps]
                elif self.mode == 'ctc_greedy_search':
                    hyps, _ = self.model.ctc_greedy_search(
                        feats,
                        feats_lengths,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        simulate_streaming=self.simulate_streaming)

                elif self.mode == 'ctc_prefix_beam_search':
                    assert (feats.size(0) == 1)
                    hyp, _ = self.model.ctc_prefix_beam_search(
                        feats,
                        feats_lengths,
                        self.beam_size,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        simulate_streaming=self.simulate_streaming)
                    hyps = [hyp]
                elif self.mode == 'attention_rescoring':
                    assert (feats.size(0) == 1)
                    hyp, _ = self.model.attention_rescoring(
                        feats,
                        feats_lengths,
                        self.beam_size,
                        decoding_chunk_size=self.decoding_chunk_size,
                        num_decoding_left_chunks=self.num_decoding_left_chunks,
                        ctc_weight=self.ctc_weight,
                        simulate_streaming=self.simulate_streaming,
                        reverse_weight=self.reverse_weight)
                    hyps = [hyp]

                content = ''
                for w in hyps[0]:
                    if w == self.eos:
                        break
                    content += self.char_dict[w]
                return content

if __name__ == '__main__':

    recog = recognize()

    result1 = recog.get_recognize("../cache/temp.wav")
    result2 = recog.get_recognize("../cache/temp1.wav")
    result3 = recog.get_recognize("../cache/temp2.wav")
    print(result1)
    print(result2)
    print(result3)

客户端

  • 首先判断是否有人在讲话,即是否有数据
    [En]

    first of all, judge whether someone is talking, that is, whether there is data*

  • 存入缓冲区,送入识别模块
  • 识别返回结果,存入字幕
  • 字幕的长度
    • 需要判断此处是否是句子结尾
    • 是的话,断句,vad
    • 不是的话,超过20个字进行断句
  • 判断静音时间
  • 如果静音时间过长,则

其实,这里并不需要保存音频,只是为了测试录音是否正常工作。

[En]

In fact, there is no need to save the audio here, just to test whether the recording is working properly.

import pyaudio
import wave

CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPALE_RATE = 44100
RECORD_SECONDS = 4
temp_save_path = "Audio/temp.wav"
p = pyaudio.PyAudio()

def save_wav(frames, save_path):

    wf = wave.open(save_path, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(2)
    wf.setframerate(SAMPALE_RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
    print('\033[93m' + "已录入缓冲区" + '\033[0m')

def recording(save_path):
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=SAMPALE_RATE,
                    input=True,
                    frames_per_buffer=CHUNK)
    print('\033[93m' + "recording" + '\033[0m')

    frames = []
    max_size = 16*4
    while 1:
        data = stream.read(CHUNK)

        frames.append(data)
        if len(frames) == max_size:

            save_wav(frames, save_path)

            frames = []
            break

recording(temp_save_path)

基于短时能量和短时过零率阈值的端点检测

[En]

Endpoint detection based on short-time energy and threshold of short-time zero-crossing rate

vad.py


import numpy as np
import pyaudio

SUCCESS = 0
FAIL = 1

audio2 = ""
stream2 = ""

def ZCR(curFrame):

    tmp1 = curFrame[:-1]
    tmp2 = curFrame[1:]
    sings = (tmp1 * tmp2  0)
    diffs = (tmp1 - tmp2) > 0.02
    zcr = np.sum(sings * diffs)
    return zcr

def STE(curFrame):

    amp = np.sum(np.abs(curFrame))
    return amp

class Vad(object):
    def __init__(self,CHUNK=1024):

        self.amp1 = 15

        self.amp2 = 1

        self.zcr1 = 2

        self.zcr2 = 1

        self.maxsilence = 45

        self.minlen = 40

        self.max_en = 20000

        self.status = 0
        self.count = 0
        self.silence = 0
        self.frame_len = CHUNK
        self.frame_inc = CHUNK / 2
        self.cur_status = 0

    def check_ontime(self, cache_frame):

        wave_data = np.frombuffer(cache_frame, dtype=np.int16)
        wave_data = wave_data * 1.0 / self.max_en
        data = wave_data[np.arange(0, self.frame_len)]

        zcr = ZCR(data)

        amp = STE(data) ** 2

        status = self.speech_status(amp, zcr)
        return status

    def speech_status(self, amp, zcr):
        status = 0

        if self.cur_status in [0, 1]:

            if amp > self.amp1 or zcr > self.zcr1:
                status = 2
                self.silence = 0
                self.count += 1

            elif amp > self.amp2 or zcr > self.zcr2:
                status = 2
                self.count += 1

            else:
                status = 0
                self.count = 0
                self.count = 0

        elif self.cur_status == 2:

            if amp > self.amp2 or zcr > self.zcr2:
                self.count += 1
                status = 2

            else:

                self.silence += 1
                if self.silence < self.maxsilence:
                    self.count += 1
                    status = 2

                elif self.count < self.minlen:
                    status = 0
                    self.silence = 0
                    self.count = 0

                else:
                    status = 3
                    self.silence = 0
                    self.count = 0
        return status

目前,它简化了功能,实现了应答时的语音识别效果。由于没有很好的方法控制控制台输出,实时字幕的显示很差。

[En]

At present, it simplifies the function and realizes the effect of speech recognition when answering. Because there is no good way to control the console output, the display of real-time subtitles is very poor.

基于缓存的语音识别不能始终捕获说话人的尾部,因此需要对尾部进行附加,以达到识别整个语音的效果。

[En]

The cache-based speech recognition can not always capture the tail of the speaker, so it is necessary to append the tail to achieve the effect of recognizing the whole speech.

import time

import pyaudio
import wave
from decoder.recognize import Recognize
import numpy as np
from vad import Vad

class RecognizeService():
    def __init__(self):
        self.CHUNK = 1024
        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 1
        self.SAMPALE_RATE = 16000
        self.temp_save_path = "../cache/temp1.wav"
        self.p = pyaudio.PyAudio()
        self.model = Recognize()
        self.stream = self.p.open(format=self.FORMAT,
                        channels=self.CHANNELS,
                        rate=self.SAMPALE_RATE,
                        input=True,
                        frames_per_buffer=self.CHUNK,
                        input_device_index=0)
        self.v = Vad(self.CHUNK)

    def save_wav(self,frames, save_path):
        wf = wave.open(save_path, 'wb')
        wf.setnchannels(self.CHANNELS)
        wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
        wf.setframerate(self.SAMPALE_RATE)
        wf.writeframes(b''.join(frames))
        wf.close()

    def recording(self,save_path):

        print('\033[93m' + "recording" + '\033[0m')

        frames = []
        max_size = 5
        long_frames = []
        next=""
        num=0
        is_speak=False
        result=""
        while True:
            stream_data = self.stream.read(self.CHUNK,exception_on_overflow=False)
            status = self.v.check_ontime(stream_data)
            if status==2:
                is_speak=True

                wave_data = np.frombuffer(stream_data, dtype=np.int16)
                frames.append(wave_data)
                if len(frames) >= max_size:
                    long_frames.extend(frames)
                    if len(long_frames) > max_size * 10:
                        long_frames = long_frames[-max_size * 10:]

                    self.save_wav(long_frames, self.temp_save_path)
                    result = self.model.get_recognize(self.temp_save_path)

                    frames = []
                    if next == result:
                        continue
                    next = result

            if status==0:
                num += 1
                if num == 10:
                    if is_speak:
                        if len(frames)>0 and len(long_frames)>0:
                            long_frames.extend(frames)
                            self.save_wav(long_frames, self.temp_save_path)
                            result = self.model.get_recognize(self.temp_save_path)
                        if result != "":
                            print(result)
                    num = 0

                    long_frames = []

                    frames = []
                    is_speak=False
                    result=""

if __name__ == '__main__':
    service = RecognizeService()
    service.recording(service.temp_save_path)

Original: https://blog.csdn.net/ALL_BYA/article/details/124039408
Author: 语音不识别
Title: 在线实时语音识别实现【完善中-本地测试已完成,只差服务器功能】

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/512787/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球