今天我分享一种通过深度学习将不同说话人从音频文件中分离的方法,这个方法的契机是我想从《哆啦A梦》的动画片里面分离主角们的说的台词。我个人是音频处理的门外汉,对很多音频处理的事情知之甚少,很多都是通过直接调用别人训练好的模型或者网上找教程现学现用的,所以如果有错误或者更好的方法,恳请读者批评指正。那接下来进入正题。

方法概述

从音频里面分离不同的说话人是音频领域的研究热点,这个任务的英文名称是说话人分类(Speaker Diarization),已经有很多研究者提出了各种解决方案。但如果按切分依据主要可以分为两种:第一种是单纯依赖音频信息的单模态说话人识别;第二种是结合语音识别(Auto Speaker Recognition, ASR)技术的多模态说话人识别。

在有较多可用数据的情况下,单模态说话人识别就能达到比较好的效果了,比如开源库pyannote就提供了在英文数据集上预训练过的模型,在很多英文场景下使用作者提供的speaker-diarazation管道应该就能达到不错的说话人分类效果。对于pyannote是通过这样的流程完成整个说话人分类的:

pyannote进行说话人分类

可以看到它通过说话活动检测(Voice Activity Detection, VAD)先将音频种有人说话的部分提取出来,之后再通过说话人转换点检测(Speaker Change Detection, SCD)进一步预测不同说话人切换的位置,从而对不同说话人在一块的音频进行分离,这样能够得到只有单个说话人的说话片段(实际这个库还考虑了多个说话人同时说话的情况,因为考虑这个问题就比较复杂了,所以略去这一部分)。在得到只有单人说话人片段之后再利用模型提取该片段的说话人嵌入(Speaker Embedding),之后通过聚类算法将音色相同的说话人聚合在一起,从而完成说话人分类。

但这个方法需要带说话人转换点的注释的多说话人的数据集或者在对应语言上预训练过的说话人转换点检测模型,不幸地是,目前pyannote并没有提供中文的说话人检测模型,这可能是在我的数据集上pyannote并没有表现出良好的分割性能的原因之一。

如果我们想提升在自己数据集上的分割性能,作者也提供了去微调这些模型以适应新数据集的方法,但人工标记说话人转换点非常地消耗时间,而且需要一定的数据量才能生效,我并没有相应的时间和金钱这样精细地标注我的数据集。

于是我们将目光放在了第二种方法上,即结合ASR的说话人分类。自从Whisper这样性能优越的语音识别模型被提出,结合ASR进行各种音频任务也成为了研究的热点,arXiv上有不少结合Whisper等ASR模型进行说话人分类。这类方法的优点是可以结合转录出来的语义信息,比如转录出","或者"."可以标识一段话的暂停或结尾。更进一步地,对中文来说,一些符号如”,“、”。“、”!“、”……“、”?“等能够标识一些短句,如果结合预测字级时间戳模型就可以获取每个短句的切分了。幸运地是,相比于说话人分类,语音识别ASR是一个更加基础也更加成熟的领域。国内一些大厂如阿里、百度等都开发了适配中文的语音识别模型并且能达到不错的准确率。因此笔者认为在中文环境下,通过ASR结合标点符号将音频切割为短句,并假定每个短句里只有一个说话人是一种低成本而且效果不错的方法。因此这篇文章将使用ASR进行分割,然后再进行说话人识别。

在正式进入方法之前,我需要说明这个方法的优点与缺点,以确认这个方法是否真的是你需要的。

优点:

  • 不需要训练或者微调分割模型,而是通过ASR转录出的文本决定切分位置。
  • 可以渐进式地训练说话人分类模型,如果你有很多类似于动画片一集一集的音频数据,你可以先注释一个基础数据集子集,然后训练一个基础的说话人模型,之后用对另外的集进行分类,之后你手动纠正错误分类的片段到正确的位置从而构成一个新的数据集,将这个数据集合入之前的训练集后我们可以训练一个性能更好的说话人分类模型,可以重复这个过程直到你满意分类器的性能为止。

缺点:

  • 通过文本模态进行分割依赖于标点注释模型的准确度和字级时间戳预测模型的准确度,因此它不一定能产生正确的分割。
  • 本文的方法不是无监督的方法,对于说话人分类本文使用监督数据训练分类器而不是聚类,因此你需要注释一定量的数据。即使注释了数据后,训练的分类器也会产生分类误差,如果对说话人分类的精度要求很高的话请使用人工标注。
  • 对于有很多人一起说话,即重叠语音的情况无能为力。本文的方法不能处理这些重叠语音的部分。

如果你对它的优点感到满意而且能够容忍它的缺点,那我们正式进入方法具体流程部分。

具体流程及供参考的实现代码

从视频中分离音频(可选)

我要将音频从视频里面分割出来所以有这个步骤,如果你已经有音频数据了就可以跳过这步。实现的方式主要是通过MoviePy这个库获取视频中的音频文件,裁剪片头片尾后保存为音频文件。这适用于片头和片尾一定在开始和结束部分,而且时长是固定的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# pre_clip.py
# 预先裁剪视频来跳过片头片尾,防止干扰
from moviepy.editor import VideoFileClip
import glob
import os


# 配置mp4路径
mp4_path = "mp4路径"
# 导出的音频文件路径
audio_path = "输出源文件音频路径"


# 利用glob获取所有mp4文件
mp4_list = glob.glob(os.path.join(mp4_path, "*.mp4"))
num = len(mp4_list)

for i, mp4 in enumerate(mp4_list):
# 生成音频文件名
basename = os.path.basename(mp4)
basename, ext_name = os.path.splitext(basename)
basename = basename.replace(" ", "_") # 将空格替换为_号

with VideoFileClip(mp4) as clip:
audio = clip.audio
# 剪切片头和片尾
# 对于我的情况,截掉前1分12秒和最后6秒
processed_audio = audio.subclip(72, -6)

# 保存片段到新文件夹中
processed_audio.write_audiofile(f"{os.path.join(audio_path, basename)}.wav")

# 指示进度
print(f"已处理: {i} / {num} \r", end="")

这样应该在audio_path对应的目录里有所有视频对应的音频文件了。

将人声与背景音乐分离(可选)

这步我们会使用深度学习模型,没错就是UVR5这个项目训练的声乐分离模型来把人声从背景音乐中分离出来。我是在Linux机器上运行的,我会更喜欢命令行或者Python库的版本,这样更便于自动化,因此我选择了audio-separator这个项目来运行UVR5训练的声乐分离模型。这一步同样是可选的,如果你认为你的音频数据集中并没有背景音乐的话这步也可以跳过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# vocal_remove.py
# 分离人声
from audio_separator import Separator
import os
import glob
import logging


# 加载所有音频文件
audio_files = glob.glob(os.path.join("你的音频路径", "*.wav"))
output_dir = "分离人声处理后的音频的保存文件夹"

# 初始化分割器
for i, audio_file in enumerate(audio_files):
separator = Separator(audio_file, model_name='UVR-MDX-NET-Voc_FT', output_single_stem="vocals",
use_cuda=True, output_dir=output_dir, log_level=logging.INFO)
separator.separate()
print(f"已完成: {i}, 文件: {audio_file}")

这样应该能在output_dir里面找到只保留人声的音频了,这个分离人声的方法应该是还有提升空间的,如果你追求更好的分离人声表现,可以换用更好的人声分离方法。

ASR预切分

当音频文件就位之后,我们开始预切分一些视频来供说话人识别打标。这里使用阿里的FUNASR项目来处理音频文件的语音识别,后续的说话人识别模型里的嵌入部分也将使用该项目的预训练嵌入模型。预切分就是不管说话人,直接按短句(也就是碰到”,“、”。“、”!“、”?“等都会被切分)分割整个音频。可以随机选择一些文件进行切分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# ASR_pre_clip.py
# 通过ASR进行预切分
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from moviepy.editor import AudioFileClip
import os

# 最大句子间隔
max_sentence_interval = 800

# 最小识别块,默认只识别不小于1000ms的块
min_chunk_length = 1000

# 需要预切分的文件路径
audio_file = "需要预切分的文件路径"

# 输出路径
chunks_dir = "切分结果输出文件夹路径"


inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
)

# 转录音频
rec_result = inference_pipeline(audio_in=audio_file)

# 将太远的词分割开,单独作为一个句子推理
new_sentences = []
for sentence in rec_result["sentences"]:
# 准备左右对齐
ts_list = sentence["ts_list"]
word_seg_list = sentence["text_seg"].split(" ")[:-1]
text = sentence["text"]
start = 0

for i in range(1, len(ts_list)):
# 检查前一个词的结束时间和后一个词的开始时间是否超出阈值
# 如果超过阈值就手动把它们分隔开
if ts_list[i][0] - ts_list[i-1][1] > max_sentence_interval:
# 寻找第i个值并把start到i-1的字符加入到新文本中
# 注意忽略掉新句子中的符号
# 其实我感觉有点多余,因为这边的转录结果是不用的,但是还是这么做一下吧
rest_of_char = i - start
new_text = ""
for j in range(start, len(text)):
if rest_of_char > 0:
new_text += text[j]

if text[j] not in [",", "。", "!"]:
rest_of_char -= 1
else:
break
# 合成新句子
new_sentence_data = {
"text": new_text + "。",
"start": ts_list[start][0],
"end": ts_list[i-1][1],
"ts_list": [ts_list[k] for k in range(start, i)]
}
new_sentences.append(new_sentence_data)

# 处理善后
# print(j)
start = j

# 如果剩下的都正常就把关键信息提取并加进去
new_sentence_data = {
"text": text[start: ],
"start": ts_list[start][0],
"end": ts_list[-1][1],
"ts_list": [ts_list[k] for k in range(start, len(ts_list))]
}
new_sentences.append(new_sentence_data)

# 切分音频并保存
basename = os.path.basename(audio_file)
basename, ext_name = os.path.splitext(basename)
# 清除掉多余的后缀,如果需要的话...
basename = basename.replace("_(Vocals)_UVR-MDX-NET-Voc_FT", "")
# 保存音频的代码还是用AudioFileClip吧,用别的有点不习惯
audio = AudioFileClip(audio_file)

j = 0

for sentence in new_sentences:
# 跳过太小的块,这部分识别也比较难识别准,还会多出很多碎片
if sentence["end"] - sentence["start"] < min_chunk_length:
continue

# 注意毫秒与秒的转换
start = sentence["start"] / 1000
end = sentence["end"] / 1000

j += 1
audio_chunk = audio.subclip(start, end)
# 创建文件夹
os.makedirs(os.path.join(chunks_dir), exist_ok=True)
audio_chunk.write_audiofile(f"{os.path.join(chunks_dir, basename)}_{j}{ext_name}")

audio.close()

我建议如果一个音频就能包含所有你关注的说话人角色的话,那就只切分一个。原则就是在保证所有说话人至少能找到一条样本数据的情况下尽量少切,毕竟这里纯体力活动很难受。如果一条音频不足以覆盖所有你感兴趣的说话人,那就更换几个音频重新运行脚本直到满足原则。这样在chunks_dir下就会有很多没有标注过,但是都是短句的音频了。

说话人分类基础模型训练

在训练之前我们先说一下这个所谓的说话人分类模型能做什么,它的作用是对它输入一段音频,它会将这段音频映射到一个说话人上,这也就完成了说话人分类。之后说一下这个分类器的结构,我们在这里要训练的说话人模型主要由两部分组成,一部分是在语音数据上预训练的说话人验证模型用于对音频提取嵌入(Embedding),这部分不会参与训练,它只负责特征提取;另一部分是在嵌入(Embedding)之上构建的LDA模型,我使用的是scikit learn的LDA实现,该模型相关的文档可以在这个地方找到。

语音嵌入使用阿里FUNASR预训练的damo/speech_eres2net_large_sv_zh-cn_3dspeaker_16k模型提取嵌入,然后将嵌入输入LDA完成分类。关于说话人设置上,如果你确信音频库里只有简单的几个人,那么你可以将说话人设置为那几个人。另一种情况是除了你感兴趣的几个主角团外每集还可能出现一些额外NPC,比如《哆啦A梦》里面出木衫可能会客串几集。我建议是新增两个额外的说话人用来标识男声和女声NPC,比如我可能会命名为qitanan(其他·男声)和qitanv(其他·女声)。

了解了这些之后,应该就可以对之前ASR预切分的数据进行打标了!你要做的就是用耳朵听这些短句的语音,然后辨别它们是哪些说话人。一个小tips就是合理使用vscode右键的”复制路径“功能可以事半功倍。好的,现在直接来看训练脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# feature_voice.py
# 通过收集的声纹音频和damo/speech_eres2net_large_sv_zh-cn_3dspeaker_16k说话人识别模型的嵌入
# 借助FLDA,训练一个说话人识别的模型
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import numpy as np
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import os
import glob
import shutil
import joblib
import json
import librosa
import torch

# 额外监督数据,如果生成的片段进行了校准,可以加入数据集中训练更好的标注模型
extra_data_dirs = []

sv_pipline = pipeline(
task='speaker-verification',
model='damo/speech_eres2net_large_sv_zh-cn_3dspeaker_16k',
model_revision='v1.0.0'
)

# 通过data训练一个FLDA判别器用于判别声音
data = {
"daxiong": [
"daxiong的语音路径,有多个就在下面写多个,多多益善",
"daxiong的第二条语音路径",
],
,
"duola": [
"duola的语音路径",
],
"panghu": [
"panghu的语音路径",
],
"xiaofu": [
"xiaofu的",
],
"qitanan": [
"其他·男声的语音路径",
]
"qitanv": [
"其他·女声的语音路径",
],
}

# 备份特征声音到指定文件夹,防止删除后丢失,也方便检查
backups_dir = "备份文件夹"
for speaker in data.keys():
os.makedirs(os.path.join(backups_dir, speaker), exist_ok=True)
for i, audio_file in enumerate(data[speaker]):
# 备份文件
basename = os.path.basename(audio_file)
shutil.copy(audio_file, os.path.join(backups_dir, speaker, basename))

# 后面清洗出来的也可以通过文件夹的形式加入进去,获得更高准确度
for extra_data_dir in extra_data_dirs:
speakers = os.listdir(extra_data_dir)
for speaker in speakers:
if speaker not in data:
raise Exception(f"不存在的角色: {speaker},请检查额外数据集: {extra_data_dir}是否有不存在的角色。")

speaker_wavs_path = glob.glob(os.path.join(extra_data_dir, speaker, "*.wav"))
for speaker_wav_path in speaker_wavs_path:
data[speaker].append(speaker_wav_path)


# 获取嵌入
def get_embedding_by_file(audio_path: str):
# 把模型移到cuda上
if next(sv_pipline.model.parameters()).device.type != "cuda":
sv_pipline.model.to("cuda")

data, fs = librosa.load(audio_path, sr=sv_pipline.model_config['sample_rate'])
output = torch.from_numpy(data).unsqueeze(0)
output = output.to("cuda")
with torch.no_grad():
embedding = sv_pipline.model(output)
return embedding


# 构造数据集
X = []
Y = []
id2speaker = {}

print("构造数据集中...")

speakers_num = len(data)
for i, speaker in enumerate(data.keys()):
id2speaker[i] = speaker
# 对于每个音频逐个获取嵌入
for audio_file in data[speaker]:
e = get_embedding_by_file(audio_file)
X.append(e.cpu().numpy())
Y.append(i)
print(f"说话人ID: {i}, 说话人: {speaker} 的特征编码完毕 {i+1} / {speakers_num}")

print(f"数据集构建完毕,现有标注数据条数: {len(X)}")

# 切换为numpy数组
X = np.array(X)
X = X.reshape(X.shape[0], -1)
Y = np.array(Y)

# 训练FLDA模型拟合
print("训练LDA模型中...")
clf = LinearDiscriminantAnalysis()
clf.fit(X, Y)

print("训练成功!准备保存模型...")

# 保存训练好的模型
model_path = "模型保存文件夹/clf.joblib"
id2speaker_path = "模型保存文件夹/id2speaker.json"
joblib.dump(clf, model_path)
json.dump(id2speaker, open(id2speaker_path, "w"), ensure_ascii=False, indent=2)
print("保存成功!")

每次你听到某段声音是对应某个角色的,就右键复制该音频路径,并在data字典里对应角色的列表中添加该音频路径。如此往复,直到所有人的音频列表里都有至少一个(当然样本多的话是好事,后面分得准的话人工的操作就少,但至少有一个样本是原则)。现在你运行这个脚本它会训练一个说话人分类的基础模型并保存。

撒花,如果你已经成功训练了一个基础的说话人分类模型,那已经取得了一些的阶段性胜利啦。但是现在训练出来的说话人基础分类模型的分类能力还是比较弱的,我们需要再给它增添一些新样本学习更稳定、更有效的分类特征。

渐进地提高说话人分类模型的性能(循环多次)

前面说到基础模型说话人分类的性能还比较薄弱,我们需要收集更多的样本来训练更好的分类器。幸运地是,现在分类器已经有一定能力将音频正确分类了,因此可以让这个基础模型先尝试去对切分的音频片段分类,然后我们来验证它分类的结果是否正确。如果我们对这个模型分类的结果进行验证,并把它分错的音频放置到正确的说话人角色下,我们就通过模型和人类配合的方式打标好了一个新的干净的数据集!这个干净的数据集又可以加入到原来的训练集里用于训练新的说话人分类模型,如此循环往复,随着标注数据越来越多,我们最终可以得到一个准确度令人满意的说话人分类模型。

现在我们把之前训练的说话人分类模型集成到ASR切分中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# enhance_clf.py
# 渐进式增强说话人分类模型性能
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import joblib
import librosa
from moviepy.editor import AudioFileClip
import numpy as np
import json
import os
import torch

# 最大句子间隔
max_sentence_interval = 800
# 语气词合入极限间隔
max_combine_interval = 100
# 最小识别块,默认只识别不小于1000ms的块
min_chunk_length = 1000
# 余裕量,似乎funasr在词对齐上做得比较好,几乎不需要余裕了
margin = 0
# 说话人识别模型路径
speaker_clf_path = "说话人分类模型文件夹/clf.joblib"
id2speaker_path = "说话人分类模型文件夹/id2speaker.json"
# 嵌入模型的目标采样率
target_sr = 16000
# 输入路径
audio_file = "想切分的音频文件路径 xx/yy.wav"
# 输出路径
chunks_dir = "输出chunks的文件夹路径"


inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
)

sv_pipline = pipeline(
task='speaker-verification',
model='damo/speech_eres2net_large_sv_zh-cn_3dspeaker_16k',
model_revision='v1.0.0'
)

# 加载说话人分类模型
speaker_clf = joblib.load(speaker_clf_path)
id2speaker = json.load(open(id2speaker_path))

# 转录音频
rec_result = inference_pipeline(audio_in=audio_file)

# 将太远的词分割开,单独作为一个句子推理
new_sentences = []
for sentence in rec_result["sentences"]:
# 准备左右对齐
ts_list = sentence["ts_list"]
word_seg_list = sentence["text_seg"].split(" ")[:-1]
text = sentence["text"]
start = 0

for i in range(1, len(ts_list)):
# 检查前一个词的结束时间和后一个词的开始时间是否超出阈值
# 如果超过阈值就手动把它们分隔开
if ts_list[i][0] - ts_list[i-1][1] > max_sentence_interval:
# 寻找第i个值并把start到i-1的字符加入到新文本中
# 注意忽略掉新句子中的符号
# 其实我感觉有点多余,因为这边的转录结果是不用的,但是还是这么做一下吧
rest_of_char = i - start
new_text = ""
for j in range(start, len(text)):
if rest_of_char > 0:
new_text += text[j]

if text[j] not in [",", "。", "!"]:
rest_of_char -= 1
else:
break
# 合成新句子
new_sentence_data = {
"text": new_text + "。",
"start": ts_list[start][0],
"end": ts_list[i-1][1],
"ts_list": [ts_list[k] for k in range(start, i)]
}
new_sentences.append(new_sentence_data)

# 处理善后
# print(j)
start = j

# 如果剩下的都正常就把关键信息提取并加进去
new_sentence_data = {
"text": text[start: ],
"start": ts_list[start][0],
"end": ts_list[-1][1],
"ts_list": [ts_list[k] for k in range(start, len(ts_list))]
}
new_sentences.append(new_sentence_data)

# 加载audio文件并按目标频率采样为numpy对象
audio_data, fs = librosa.load(audio_file, sr=target_sr)

# 获取嵌入
def get_embedding(audio_data: np.ndarray):
# 把模型移到cuda上
if next(sv_pipline.model.parameters()).device.type != "cuda":
sv_pipline.model.to("cuda")

output = torch.from_numpy(audio_data).unsqueeze(0)
output = output.to("cuda")
with torch.no_grad():
embedding = sv_pipline.model(output)
return embedding

# 计算每个片段的持续时间,超过1s的会送进去分类
# 短于1s的视为语气词,暂时不做分类(实际上分类的精度也很低)
for sentence in new_sentences:
# 获取说话人id
# 先裁剪片段
sentence["duration"] = sentence["end"] - sentence["start"]
if sentence["duration"] < min_chunk_length:
# 如果小于1s的片段就不做分类
sentence["speaker_id"] = -1
continue

# 大于1s的我们截取该段
start_sample = int((sentence["start"] / 1000) * fs)
end_sample = int((sentence["end"] / 1000) * fs)
target_audio_data = audio_data[start_sample: end_sample]

# 获取说话人id
embedding = get_embedding(target_audio_data).cpu().numpy()
result = speaker_clf.predict_proba(embedding)[0]
speaker_id, proba = np.argmax(result), max(result)

# 记录说话人id、概率和说话人名字
sentence["speaker_id"], sentence["speaker_proba"] = speaker_id, proba
sentence["speaker"] = id2speaker[str(speaker_id)]


processed_sentences = []

sentence_num_before_processing = len(new_sentences)
for i in range(sentence_num_before_processing):
# 按句子类型进行操作
if new_sentences[i]["speaker_id"] == -1:
# 语气词的处理
# 增强阶段不考虑语气词处理
continue
else:
processed_sentences.append(new_sentences[i])

# 试着切一下看看效果
# 切分音频并保存
basename = os.path.basename(audio_file)
basename, ext_name = os.path.splitext(basename)
# 清除掉多余的后缀,如果需要的话...
basename = basename.replace("_(Vocals)_UVR-MDX-NET-Voc_FT", "")
# 保存音频的代码还是用AudioFileClip吧,用别的有点不习惯
audio = AudioFileClip(audio_file)

j = 0
all_ = len(processed_sentences)

for sentence in processed_sentences:
start = sentence["start"] / 1000
end = sentence["end"] / 1000
speaker = sentence["speaker"]
j += 1
audio_chunk = audio.subclip(start, end)
# 创建文件夹
os.makedirs(os.path.join(chunks_dir, speaker), exist_ok=True)
audio_chunk.write_audiofile(f"{os.path.join(chunks_dir, speaker, basename)}_{j}{ext_name}")

audio.close()

运行enhance_clf.py之后应该能得到一系列长于1s的音频,而且按基础说话人分类的结果将这些音频都归类到不同的说话人文件夹下面了。此时我们需要逐个检查这些文件,将不对应的音频文件(即分类器判断错误的)放入正确的说话人文件夹中。这里有一个需要注意的,如果你发现某个音频是有多个说话人或者是没处理干净的BGM的话,删除该音频避免它在后面混入训练数据中干扰模型。

最后,我们能够将所有音频检查完,得到一个干净的新数据集。重命名该文件夹,比如如果一开始放在/root/data/processed/chunks下的,在完成后可以重命名chunks文件夹为该集首字母的组合(这个随意,只是怕误删掉),比如切分的这一集是“大雄的黑洞”就重命名为dxdhd_chunks

接下来要将这个新数据集加入到模型中训练一个更好的分类模型,在feature_voice.py中将新数据文件夹加入extra_data_dirs这个列表里即可:

1
2
3
# feature_voice.py
# 额外监督数据,如果生成的片段进行了校准,可以加入数据集中训练更好的标注模型
extra_data_dirs = ["/root/data/processed/dxdhd_chunks"]

再次执行feature_voice.py可以训练一个新的更好的说话人分类模型并进行保存。

做完这章的步骤后,继续切另外的音频文件看看效果,如果效果不满意那就重复上述流程,随着标记数据量的增多,模型的分类错误率应该逐步下降。当我们觉得分类器的分类性能令人满意了之后,我们可以进入最后的一个步骤——组合ASR和说话人分类模型从音频中分离不同说话人。

组合ASR和说话人分类模型从音频中分离不同说话人

最后一步了,我们将上一个步骤训练好的说话人分类模型和ASR模型组合起来,在整个待处理音频文件夹上应用ASR切分+说话人分类。但也有一些和之前不同的地方:

  1. 将启发式地处理这些小于1s的音频块,因为这些音频一般是语气词,将其统称为语气片段。如果这个语气片段的左边或右边存在被标记为说话人的音频,而且间隔小于语气词合入极限间隔,那就会就近合入一个说话人中。
  2. 会将间隔小于预定义的最大句子间隔而且说话人相同的短句合为一个长句,这样切分的片段听起来更自然。

下面是最后切分脚本的参考实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# splitter_by_asr.py
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import joblib
import librosa
from moviepy.editor import AudioFileClip
import numpy as np
import json
import os
import glob
import torch

# 最大句子间隔
max_sentence_interval = 800
# 语气词合入极限间隔
max_combine_interval = 100
# 最小识别块,默认只识别不小于1000ms的块
min_chunk_length = 1000
# 余裕量,似乎funasr在词对齐上做得比较好,几乎不需要余裕了
margin = 0
# 说话人识别模型路径
speaker_clf_path = "说话人识别模型文件夹/clf.joblib"
id2speaker_path = "说话人识别模型文件夹/id2speaker.json"
# 嵌入模型的目标采样率
target_sr = 16000
# 存放需要切分和说话人识别的音频文件夹路径
audio_path = "需要切分和说话人识别的音频文件夹路径"
# 输出路径
chunks_dir = "输出的路径 xxx/raw类似的"
# token批量大小
batch_size_token = 4000
# 起始位置,如果爆显存了调整该值跳过已经转录的样本
start_pos = 0


inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
)

sv_pipline = pipeline(
task='speaker-verification',
model='damo/speech_eres2net_large_sv_zh-cn_3dspeaker_16k',
model_revision='v1.0.0'
)

# 加载说话人分类模型
speaker_clf = joblib.load(speaker_clf_path)
id2speaker = json.load(open(id2speaker_path))

# 获取嵌入
def get_embedding(audio_data: np.ndarray):
# 把模型移到cuda上
if next(sv_pipline.model.parameters()).device.type != "cuda":
sv_pipline.model.to("cuda")

output = torch.from_numpy(audio_data).unsqueeze(0)
output = output.to("cuda")
with torch.no_grad():
embedding = sv_pipline.model(output)
return embedding

audio_files = glob.glob(os.path.join(audio_path, "*.wav"))
# 跳过已经转录的音频
audio_files = audio_files[start_pos: ]
all_num = len(audio_files)

# 逐个处理每个音频
for a_i, audio_file in enumerate(audio_files):
# 转录音频
rec_result = inference_pipeline(audio_in=audio_file, batch_size_token=batch_size_token)

# 将太远的词分割开,单独作为一个句子推理
new_sentences = []
for sentence in rec_result["sentences"]:
# 准备左右对齐
ts_list = sentence["ts_list"]
word_seg_list = sentence["text_seg"].split(" ")[:-1]
text = sentence["text"]
start = 0

for i in range(1, len(ts_list)):
# 检查前一个词的结束时间和后一个词的开始时间是否超出阈值
# 如果超过阈值就手动把它们分隔开
if ts_list[i][0] - ts_list[i-1][1] > max_sentence_interval:
# 寻找第i个值并把start到i-1的字符加入到新文本中
# 注意忽略掉新句子中的符号
# 其实我感觉有点多余,因为这边的转录结果是不用的,但是还是这么做一下吧
rest_of_char = i - start
new_text = ""
for j in range(start, len(text)):
if rest_of_char > 0:
new_text += text[j]

if text[j] not in [",", "。", "!"]:
rest_of_char -= 1
else:
break
# 合成新句子
new_sentence_data = {
"text": new_text + "。",
"start": ts_list[start][0],
"end": ts_list[i-1][1],
"ts_list": [ts_list[k] for k in range(start, i)]
}
new_sentences.append(new_sentence_data)

# 处理善后
# print(j)
start = j

# 如果剩下的都正常就把关键信息提取并加进去
new_sentence_data = {
"text": text[start: ],
"start": ts_list[start][0],
"end": ts_list[-1][1],
"ts_list": [ts_list[k] for k in range(start, len(ts_list))]
}
new_sentences.append(new_sentence_data)

# 加载audio文件并按目标频率采样为numpy对象
audio_data, fs = librosa.load(audio_file, sr=target_sr)

# 计算每个片段的持续时间,超过1s的会送进去分类
# 短于1s的视为语气词,暂时不做分类(实际上分类的精度也很低)
for sentence in new_sentences:
# 获取说话人id
# 先裁剪片段
sentence["duration"] = sentence["end"] - sentence["start"]
if sentence["duration"] < min_chunk_length:
# 如果小于1s的片段就先不做分类
sentence["speaker_id"] = -1
continue

# 大于1s的我们截取该段
start_sample = int((sentence["start"] / 1000) * fs)
end_sample = int((sentence["end"] / 1000) * fs)
target_audio_data = audio_data[start_sample: end_sample]

# 获取说话人id
embedding = get_embedding(target_audio_data).cpu().numpy()
result = speaker_clf.predict_proba(embedding)[0]
speaker_id, proba = np.argmax(result), max(result)

# 记录说话人id、概率和说话人名字
sentence["speaker_id"], sentence["speaker_proba"] = speaker_id, proba
sentence["speaker"] = id2speaker[str(speaker_id)]


# 合并相同说话人片段和语气片段
# 请注意这些语气片段应该有一个合成时间,大于该时间的不应该合入
processed_sentences = []
# 由后到前合并这些句子,注意对说话人相同的句子按句子间隔合入,对语气词按极限语气词合入间隔合入
sentence_num_before_processing = len(new_sentences)
for i in range(sentence_num_before_processing-1, -1, -1):
# 按句子类型进行操作
if new_sentences[i]["speaker_id"] == -1:
# 语气词的处理
# 先考虑特殊情况
if i == sentence_num_before_processing - 1:
if new_sentences[i]["start"] - new_sentences[i-1]["end"] < max_combine_interval and \
new_sentences[i-1]["speaker_id"] != -1:
# 满足条件的尾句合入前一个实句中
new_sentences[i-1]["end"] = new_sentences[i]["end"]
new_sentences[i-1]["text"] = new_sentences[i-1]["text"] + new_sentences[i]["text"]
elif i == 0:
if len(processed_sentences) > 0 and len(new_sentences) > 1 and \
processed_sentences[0]["start"] - new_sentences[i]["end"] < max_combine_interval:
# 满足条件的首句合入后一个实句中,这一般不太可能发生
# 因为第一句一般是报幕,就算低于1s也不可能只有300ms间隔
processed_sentences[0]["start"] = new_sentences[i]["start"]
processed_sentences[0]["text"] = new_sentences[i]["text"] + processed_sentences[0]["text"]
else:
# 中间情况,一般是左右看然后选最优的加入
min_duration = max_sentence_interval + 1
min_pos = None
# 先往左看
if new_sentences[i]["start"] - new_sentences[i-1]["end"] < max_combine_interval and \
new_sentences[i-1]["speaker_id"] != -1:
# 更新最小间隔
min_duration = new_sentences[i]["start"] - new_sentences[i-1]["end"]
min_pos = "left"
# 再往右看
if len(processed_sentences) > 0 and len(new_sentences) > 1 and \
processed_sentences[0]["start"] - new_sentences[i]["end"] < max_combine_interval:
if processed_sentences[0]["start"] - new_sentences[i]["end"] < min_duration:
min_duration = processed_sentences[0]["start"] - new_sentences[i]["end"]
min_pos = "right"

# 合入句子中
if min_pos is not None:
if min_pos == "left":
new_sentences[i-1]["end"] = new_sentences[i]["end"]
new_sentences[i-1]["text"] = new_sentences[i-1]["text"] + new_sentences[i]["text"]
else:
processed_sentences[0]["start"] = new_sentences[i]["start"]
processed_sentences[0]["text"] = new_sentences[i]["text"] + processed_sentences[0]["text"]
else:
# 在这个分支的是实句,只考虑向后进行合并
if len(processed_sentences) == 0:
# 没有句子的话直接放进去
processed_sentences.append(new_sentences[i])
else:
# 先看看能不能合入
if processed_sentences[0]["speaker_id"] == new_sentences[i]["speaker_id"] and \
processed_sentences[0]["start"] - new_sentences[i]["end"] < max_sentence_interval:
processed_sentences[0]["text"] = new_sentences[i]["text"] + processed_sentences[0]["text"]
processed_sentences[0]["start"] = new_sentences[i]["start"]
else:
# 不能合入就作为独立句子插入列表首
processed_sentences.insert(0, new_sentences[i])

# 切分音频并保存
basename = os.path.basename(audio_file)
basename, ext_name = os.path.splitext(basename)
# 清除掉多余的后缀,如果需要的话...
basename = basename.replace("_(Vocals)_UVR-MDX-NET-Voc_FT", "")
# 保存音频的代码还是用AudioFileClip吧,用别的有点不习惯
audio = AudioFileClip(audio_file)

j = 0
all_ = len(processed_sentences)

for sentence in processed_sentences:
start = sentence["start"] / 1000
end = sentence["end"] / 1000
speaker = sentence["speaker"]
j += 1
audio_chunk = audio.subclip(start, end)
# 创建文件夹
os.makedirs(os.path.join(chunks_dir, speaker), exist_ok=True)
audio_chunk.write_audiofile(f"{os.path.join(chunks_dir, speaker, basename)}_{j}{ext_name}")

audio.close()
print(f"已处理: {a_i + 1} / {all_num}")

print("处理完成!")

等待处理完成后,应该在chunks变量对应的目录下看到最终的结果了,到这里就已经把音频成功切分并按说话人分好类了,完结撒花。

总结与思考

因为基于聚类的说话人分类比较吃embedding的余弦相似度判别效果,而主要在英文数据集上训练的pyannote对中文音频的判别效果不佳,而且要收集数据微调一个这样的纯音频说话人识别模型难度比较大,所以放弃了基于纯音频方案的分割。

作为替代,使用了ASR转录出来的文本信息中的语义停顿符号作为分割依据,这在一定程度上缓解了多说话人音频分割不准确的问题。然而基于语义信息的切分依然受到转录文本质量、标点符号生成质量和字级时间戳模型对齐精度的影响,尤其是时间戳对齐不准确的问题会导致音频提前开始或提早结束,这影响了切分后音频的质量。因此,这个方法目前来说的累计误差还是比较大的,还存在性能提升空间。并且它并不能处理多说话人说话重叠的问题,这似乎是这个方案的硬伤。

分割好了音频片段后,使用渐进式训练的方式通过人类反馈提升说话人分类模型的性能。在人类的监督下,有监督的说话人分离模型的表现应该远远好于基于聚类算法的说话人分类效果。同时,因为是渐进式地提升模型分类效果,所以理论上越到渐进式训练的后期需要人类调整的错误样本就越少,这有助于减少人类标注的时间成本。

这个流程仍然有一些值得思考的地方:

  1. 在最开始训练说话人分类基础模型时,纯粹依赖于手工挑选样本,这样非常的枯燥而且消耗人力。在这个步骤里面如果先聚类,通过聚类算法把相似的音频合在一堆然后再让人来挑选会不会能节省工作量。
  2. 在embedding后面接LDA模型(线性判别模型)完成说话人分类是因为我觉得它优化的原理来自“最大化类间距离,最小化类内距离”,这样的效果就好像是一个引入人类监督的“聚类算法”。通过LDA找到某个线性子空间,然后将样本投影到这个空间后就能发现不同说话人的音频数据点形成不同的团簇。这样降维的做法比较符合直觉,假设提取的音频embedding里包含了说话人详细的音色信息,但如果我们只需要分类男声和女声,那么其实很多音色信息都是冗余的而且可能带来误判,如果通过LDA裁剪这些高维的信息丰富的embedding到我们任务相关的低维空间,这样应该能在利用预训练的embedding和收集的任务相关的训练样本中得到一个比较好的平衡。但随着可用训练样本的增多,会不会使用更复杂的分类器,如随机森林、XGBoost能得到更稳定更好的效果呢?

以上就是全部内容了,非常感谢这些音频领域的研究者们开放了这么多有价值的预训练模型!