今回は音声認識のデータセット「ESC-50」をCNNで分類した。
特にこだわったのが、GPUでも普通にやったらOOMエラーが出るくらいの大容量のデータセットを、kerasのfit_generatorメソッドを使ってCPU上でもできるようにしたこと。
あとは音声認識は触れたことなかったので、前処理から学習するまでの作業ログ。
目次
1.音声データセット(ESC-50)
2.音声データの水増し(Augmentation)
3.水増した音声データの保存と読み込み
4.データ前処理とCPU上で学習(CNN)
1.音声データセット(ESC-50)
ESC-50の音声は環境音・自然音からなる声を含まない音。
動物の鳴き声、雨の音、人間の咳、時計のアラーム、エンジン音など50クラス。それをCNNで分類してみる。
ファイル形式は拡張子が.wavの音声。サイトで動画が音源のmp3やmp4もwavに変換することができる類のやつで、macで普通に再生できる。
自分のPCスペックだと普通にダウンロードできなかったので、linuxコマンドで一晩かけてダウンロード。
$ nohup wget [URL]
nohupコマンドを使えば、PCがスリープモードになっても動き続けてくれる。
import librosa import librosa.display import matplotlib.pyplot as plt import seaborn as sn from sklearn import model_selection from sklearn import preprocessing import IPython.display as ipd # データのロード esc_dir = os.path.join(base_dir, "ESC-50-master") meta_file = os.path.join(esc_dir, "meta/esc50.csv") audio_dir = os.path.join(esc_dir, "audio/") # ラベルとその名前のDataFrame class_dict = {} for i in range(data_size[0]): if meta_data.loc[i,"target"] not in class_dict.keys(): class_dict[meta_data.loc[i,"target"]] = meta_data.loc[i,"category"] class_pd = pd.DataFrame(list(class_dict.items()), columns=["labels","classes"]) # ダウンロードした音声データの読み込み用関数 # load a wave data def load_wave_data(audio_dir, file_name): file_path = os.path.join(audio_dir, file_name) x, fs = librosa.load(file_path, sr=44100) return x,fs # change wave data to mel-stft def calculate_melsp(x, n_fft=1024, hop_length=128): stft = np.abs(librosa.stft(x, n_fft=n_fft, hop_length=hop_length))**2 log_stft = librosa.power_to_db(stft) melsp = librosa.feature.melspectrogram(S=log_stft,n_mels=128) return melsp # display wave in plots def show_wave(x): plt.plot(x) plt.show() # display wave in heatmap def show_melsp(melsp, fs): librosa.display.specshow(melsp, sr=fs) plt.colorbar() plt.show()
2.音声データの水増し(Augmentation)
1stステップでデータを音声認識用に変換し、2ndステップで水増しする。
1stステップ:変換
まずただの音声をメル周波数に変換。
メル周波数は人間の聴覚でも感知できるように考慮された尺度のこと。メル周波数に変換すれば、人間の耳でもなんの音かわかるようになる。
そのあとは分類しやすくするためフーリエ変換で次元圧縮。フーリエ変換での次元圧縮は画像でもよく使われてて、音声でも同じことができる。
フーリエ変換すると、音声を表す2次元グラフの横軸が時間だったのを、周波数のものに変換する。
直感的には、N次元の音声を人間にもわかるよう(メル周波数)に変換し、学習しやすいようにフーリエ変換で次元圧縮するイメージ。
# サンプル音声データをロード x, fs = load_wave_data(audio_dir, meta_data.loc[0,"filename"]) # 音声をメル周波数に変換 melsp = calculate_melsp(x) print("wave size:{0}\nmelsp size:{1}\nsamping rate:{2}".format(x.shape, melsp.shape, fs)) show_wave(x) show_melsp(melsp, fs) # 実際にjupyter上で音声が聞ける ipd.Audio(x, rate=fs) >>>output wave size:(220500,) melsp size:(128, 1723) samping rate:44100
音声データ図
2nd:水増し
今回は元の音声をホワイトノイズ、シフトサウンド、ストレッチサウンドに変換し、水増し。
【ホワイトノイズ】
ホワイトノイズは広い音域に同程度の強度の「シャー」と聞こえるノイズ音を加える。
# add white noise def add_white_noise(x, rate=0.002): return x + rate*np.random.randn(len(x)) x_wn = add_white_noise(x) melsp = calculate_melsp(x_wn) print("wave size:{0}\nmelsp size:{1}\nsamping rate:{2}".format(x_wn.shape, melsp.shape, fs)) show_wave(x_wn) show_melsp(melsp, fs) >>>output # shapeは同じ wave size:(220500,) melsp size:(128, 1723)
【シフトサウンド】
音声の周波数を変える。
def shift_sound(x, rate=2): return np.roll(x, int(len(x)//rate)) x_ss = shift_sound(x) melsp = calculate_melsp(x_ss)
【ストレッチサウンド】
周波数はそのままで、テンポ(音声のなる時間)を変える。
例えば、10秒間の音声を8秒間に変換するとか。
def stretch_sound(x, rate=1.1): input_length = len(x) x = librosa.effects.time_stretch(x, rate) if len(x)>input_length: return x[:input_length] else: return np.pad(x, (0, max(0, input_length - len(x))), "constant") x_st = stretch_sound(x) melsp = calculate_melsp(x_st)
3.水増した音声データの保存と読み込
まず、普通のtrainとtestデータを保存。(ファイル名:"esc_melsp_train_raw.npz")
freq = 128 time = 1723 # save wave data in npz, with augmentation def save_np_data(filename, x, y, aug=None, rates=None): np_data = np.zeros(freq*time*len(x)).reshape(len(x), freq, time) np_targets = np.zeros(len(y)) for i in range(len(y)): _x, fs = load_wave_data(audio_dir, x[i]) if aug is not None: _x = aug(x=_x, rate=rates[i]) _x = calculate_melsp(_x) np_data[i] = _x np_targets[i] = y[i] np.savez(filename, x=np_data, y=np_targets) # get training dataset and target dataset x = list(meta_data.loc[:,"filename"]) y = list(meta_data.loc[:, "target"]) x_train, x_test, y_train, y_test = model_selection.train_test_split(x, y, test_size=0.25, stratify=y) print("x train:{0}\ny train:{1}\nx test:{2}\ny test:{3}".format(len(x_train), len(y_train), len(x_test), len(y_test))) >>>output x train:1500 y train:1500 x test:500 y test:500
次の3つはtrainデータをホワイト・ストレッチ・シフトで水増し保存。(ファイル名:"esc_melsp_train_ss.npz","esc_melsp_train_st.npz", "esc_melsp_train_wn.npz")
# save training dataset with white noise if not os.path.exists("esc_melsp_train_wn.npz"): rates = np.random.randint(1,50,len(x_train))/10000 save_np_data("esc_melsp_train_wn.npz", x_train, y_train, aug=add_white_noise, rates=rates) # save training dataset with sound shift if not os.path.exists("esc_melsp_train_ss.npz"): rates = np.random.choice(np.arange(2,6),len(y_train)) save_np_data("esc_melsp_train_ss.npz", x_train, y_train, aug=shift_sound, rates=rates) # save training dataset with stretch if not os.path.exists("esc_melsp_train_st.npz"): rates = np.random.choice(np.arange(80,120),len(y_train))/100 save_np_data("esc_melsp_train_st.npz", x_train, y_train, aug=stretch_sound, rates=rates)
最後はホワイトノイズした後、ストレッチかシフトを組み合わせた水増しデータを保存(”esc_melsp_train_com.npz")。
# save training dataset with combination of white noise and shift or stretch if not os.path.exists("esc_melsp_train_com.npz"): np_data = np.zeros(freq*time*len(x_train)).reshape(len(x_train), freq, time) np_targets = np.zeros(len(y_train)) for i in range(len(y_train)): x, fs = load_wave_data(audio_dir, x_train[i]) x = add_white_noise(x=x, rate=np.random.randint(1,50)/1000) if np.random.choice((True,False)): x = shift_sound(x=x, rate=np.random.choice(np.arange(2,6))) else: x = stretch_sound(x=x, rate=np.random.choice(np.arange(80,120))/100) x = calculate_melsp(x) np_data[i] = x np_targets[i] = y_train[i] np.savez("esc_melsp_train_com.npz", x=np_data, y=np_targets)
5つのファイルをまとめてtrain_filesにまとめる。ファイル名“esc_melsp_train_com.npz”に保存
テストデータのファイル名は"esc_melsp_test.npz"。
# dataset files train_files = ["esc_melsp_train_raw.npz", "esc_melsp_train_ss.npz", "esc_melsp_train_st.npz", "esc_melsp_train_wn.npz", "esc_melsp_train_com.npz"] test_file = "esc_melsp_test.npz"
4.データ前処理とCPU上で学習(CNN)
前処理
振動数、テンポの設定をして、データ読み込み
振動数(または周波数):freq = 128
テンポ(音のなる時間):time =1723
CNNに読み込ませるデータshapeは
x_train:(7500, 128, 1723, 1):(batch_size, freq, time, 1)
y_train:(7500, 50):(batch_size, classes)
freq = 128 time = 1723 train_num = 1500 test_num = 500 # データセット用placeholderの定義 x_train = np.zeros(freq*time*train_num*len(train_files)).reshape(train_num*len(train_files), freq, time) y_train = np.zeros(train_num*len(train_files)) # ファイルからtrain/testデータ読み込み for i in range(len(train_files)): data = np.load(train_files[i]) x_train[i*train_num:(i+1)*train_num] = data["x"] y_train[i*train_num:(i+1)*train_num] = data["y"] test_data = np.load(test_file) x_test = test_data["x"] y_test = test_data["y"] # ラベルをone-hotに変換 classes = 50 y_train = keras.utils.to_categorical(y_train, classes) y_test = keras.utils.to_categorical(y_test, classes) # CNN用にデータを(batch_size, freq, time, 1)にreshape x_train = x_train.reshape(train_num*5, freq, time, 1) x_test = x_test.reshape(test_num, freq, time, 1) print("x train:{0}\ny train:{1}\nx test:{2}\ny test:{3}".format(x_train.shape, y_train.shape, x_test.shape, y_test.shape)) >>>output x train:(7500, 128, 1723, 1) y train:(7500, 50) x test:(500, 128, 1723, 1) y test:(500, 50)
CNNで学習
割と音声認識用でよく使われてるCNNを使用。
def cba(inputs, filters, kernel_size, strides): x = Conv2D(filters, kernel_size=kernel_size, strides=strides, padding='same')(inputs) x = BatchNormalization()(x) x = Activation("relu")(x) return x # Callbackとか model_dir = "~/ESC-50-master/dir" if not os.path.exists(model_dir): os.mkdir(model_dir) es_cb = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto') chkpt = os.path.join(model_dir, 'esc50_.{epoch:02d}_{val_loss:.4f}_{val_acc:.4f}.hdf5') cp_cb = ModelCheckpoint(filepath = chkpt, monitor='val_loss', verbose=1, save_best_only=True, mode='auto') # CNN inputs = Input(shape=(x_train.shape[1:])) x_1 = cba(inputs, filters=32, kernel_size=(1,8), strides=(1,2)) x_1 = cba(x_1, filters=32, kernel_size=(8,1), strides=(2,1)) x_1 = cba(x_1, filters=64, kernel_size=(1,8), strides=(1,2)) x_1 = cba(x_1, filters=64, kernel_size=(8,1), strides=(2,1)) x_2 = cba(inputs, filters=32, kernel_size=(1,16), strides=(1,2)) x_2 = cba(x_2, filters=32, kernel_size=(16,1), strides=(2,1)) x_2 = cba(x_2, filters=64, kernel_size=(1,16), strides=(1,2)) x_2 = cba(x_2, filters=64, kernel_size=(16,1), strides=(2,1)) x_3 = cba(inputs, filters=32, kernel_size=(1,32), strides=(1,2)) x_3 = cba(x_3, filters=32, kernel_size=(32,1), strides=(2,1)) x_3 = cba(x_3, filters=64, kernel_size=(1,32), strides=(1,2)) x_3 = cba(x_3, filters=64, kernel_size=(32,1), strides=(2,1)) x_4 = cba(inputs, filters=32, kernel_size=(1,64), strides=(1,2)) x_4 = cba(x_4, filters=32, kernel_size=(64,1), strides=(2,1)) x_4 = cba(x_4, filters=64, kernel_size=(1,64), strides=(1,2)) x_4 = cba(x_4, filters=64, kernel_size=(64,1), strides=(2,1)) x = Add()([x_1, x_2, x_3, x_4]) x = cba(x, filters=128, kernel_size=(1,16), strides=(1,2)) x = cba(x, filters=128, kernel_size=(16,1), strides=(2,1)) x = GlobalAveragePooling2D()(x) x = Dense(classes)(x) x = Activation("softmax")(x) model = Model(inputs, x) # initiate Adam optimizer with amsgrad opt = keras.optimizers.adam(lr=0.00001, decay=1e-6, amsgrad=True) model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy']) model.summary() >>>output input_2 (InputLayer) (None, 128, 1723, 1) 0 ~~~~~ activation_38 (Activation) (None, 50)
model.fit_generator()で、大容量でCPUでも訓練できるようにした。
# class data generator class MixupGenerator(): def __init__(self, x_train, y_train, batch_size=16, alpha=0.2, shuffle=True): self.x_train = x_train self.y_train = y_train self.batch_size = batch_size self.alpha = alpha self.shuffle = shuffle self.sample_num = len(x_train) def __call__(self): while True: indexes = self.__get_exploration_order() itr_num = int(len(indexes) // (self.batch_size * 2)) for i in range(itr_num): batch_ids = indexes[i * self.batch_size * 2:(i + 1) * self.batch_size * 2] x, y = self.__data_generation(batch_ids) yield x, y def __get_exploration_order(self): indexes = np.arange(self.sample_num) if self.shuffle: np.random.shuffle(indexes) return indexes def __data_generation(self, batch_ids): _, h, w, c = self.x_train.shape _, class_num = self.y_train.shape x1 = self.x_train[batch_ids[:self.batch_size]] x2 = self.x_train[batch_ids[self.batch_size:]] y1 = self.y_train[batch_ids[:self.batch_size]] y2 = self.y_train[batch_ids[self.batch_size:]] l = np.random.beta(self.alpha, self.alpha, self.batch_size) x_l = l.reshape(self.batch_size, 1, 1, 1) y_l = l.reshape(self.batch_size, 1) x = x1 * x_l + x2 * (1 - x_l) y = y1 * y_l + y2 * (1 - y_l) return x, y # 訓練 batch_size = 16 epochs = 10 training_generator = MixupGenerator(x_train, y_train)() model.fit_generator(generator=training_generator, steps_per_epoch=x_train.shape[0] // batch_size, validation_data=(x_test, y_test), epochs=epochs, verbose=1, shuffle=True, callbacks=[es_cb, cp_cb])
今回は訓練を終えて、精度出すところまで行かなかったけど、これと同じやり方なら80%を超えて、人間の精度を超えるらしい。
音声認識は触れたことなかったので、学習までのデータの加工とか、水増し、CPU上で大容量データの訓練方法とかのいいアウトプットになった。