Source code for alex.utils.htk

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#

import numpy
import re
import glob
import wave

from struct import unpack, pack

from alex.utils.cache import lru_cache
from alex.utils.mfcc import MFCCFrontEnd

"""
The htk module implements classes for manipulation with the MLF files.
"""

LPC = 1
LPCREFC = 2
LPCEPSTRA = 3
LPCDELCEP = 4
IREFC = 5
MFCC = 6
FBANK = 7
MELSPEC = 8
USER = 9
DISCRETE = 10
PLP = 11

_E = 0000100  # has energy
_N = 0000200  # absolute energy supressed
_D = 0000400  # has delta coefficients
_A = 0001000  # has acceleration (delta-delta) coefficients
_C = 0002000  # is compressed
_Z = 0004000  # has zero mean static coefficients
_K = 0010000  # has CRC checksum
_O = 0020000  # has 0th cepstral coefficient
_V = 0040000  # has VQ data
_T = 0100000  # has third differential coefficients


[docs]class Features: "Read HTK format feature files" def __init__(self, file_name=None): self.swap = (unpack('=i', pack('>i', 42))[0] != 42) self.frames = [] self.file_name = file_name if self.file_name: self.open(file_name) def __len__(self): return len(self.frames) def __iter__(self): for i in self.frames: yield i def __getitem__(self, i): return self.frames[i]
[docs] def open(self, file_name): # pylint: disable-msg=E1103 f = open(file_name, "rb") # read header spam = f.read(12) self.nSamples, self.sampPeriod, self.sampSize, self.parmKind = unpack( ">IIHH", spam) # get coefficients for compressed data if self.parmKind & _C: self.dtype = 'h' self.veclen = self.sampSize / 2 if self.parmKind & 0x3f == IREFC: self.A = 32767 self.B = 0 else: self.A = numpy.fromfile(f, 'f', self.veclen) self.B = numpy.fromfile(f, 'f', self.veclen) if self.swap: self.A = self.A.byteswap() self.B = self.B.byteswap() else: self.dtype = 'f' self.veclen = self.sampSize / 4 self.hdrlen = f.tell() data = numpy.fromfile(f, self.dtype) if self.parmKind & _K: # remove and ignore check-sum data = data[:-1] data = data.reshape(len(data) / self.veclen, self.veclen) if self.swap: data = data.byteswap() # un-compress data to floats if required if self.parmKind & _C: data = (data.astype('f') + self.B) / self.A self.frames = data f.close()
[docs]class MLF: """Read HTK MLF files. Def: segment is a sequence of frames with the same label. """ def __init__(self, file_name=None, max_files=None): self.mlf = {} self.max_files = max_files self.file_name = file_name if self.file_name: self.open(file_name) def __len__(self): return len(self.mlf) def __iter__(self): for i in self.mlf: yield i def __getitem__(self, i): return self.mlf[i]
[docs] def open(self, file_name): f = open(file_name, 'r') n_files = 0 for l in f: if self.max_files and n_files > self.max_files: break l = l.strip() if l.startswith('"'): param_file_name = l[1:-1].replace("*/", '') param_file_name = re.sub(r"\.rec$", "", param_file_name) param_file_name = re.sub(r"\.lab$", "", param_file_name) transcription = [] continue if l.startswith('.'): self.mlf[param_file_name] = transcription n_files += 1 continue c = l.split() if len(c) == 3: # I get aligned mlf s, e, label = c s, e = int(s), int(e) m = label.find('-') p = label.rfind('+') if m != -1 and p != -1: label = label[m + 1:p] transcription.append([s, e, label]) elif len(c) == 1: # non aligned data pass f.close()
[docs] def filter_zero_segments(self): """Remove aligned segments which have zero length.""" for f in self.mlf: transcription = [] for s, e, l in self.mlf[f]: if s == e: # skip continue else: transcription.append([s, e, l]) self.mlf[f] = transcription
[docs] def sub(self, pattern, repl, pos=True): for f in self.mlf: for i, [s, e, l] in enumerate(self.mlf[f]): if pos and l == pattern: self.mlf[f][i][2] = repl if not pos and l != pattern: self.mlf[f][i][2] = repl
[docs] def merge(self): """Merge the consecutive segments with the same label into one segment.""" for f in self.mlf: transcription = [] prev_w = None prev_start = 0 prev_end = 0 # print f for s, e, l in self.mlf[f]: # print "O", s, e, l if l == prev_w: # merge prev_end = e else: if prev_w: transcription.append([prev_start, prev_end, prev_w]) # print "N", prev_start, prev_end, prev_w prev_start, prev_end, prev_w = s, e, l if prev_w: transcription.append([prev_start, prev_end, prev_w]) # print "N", prev_start, prev_end, prev_w self.mlf[f] = transcription
[docs] def times_to_seconds(self): for f in self.mlf: for i in range(len(self.mlf[f])): self.mlf[f][i][0] /= 10000000 self.mlf[f][i][1] /= 10000000
[docs] def times_to_frames(self, frame_length=0.010): for f in self.mlf: for i in range(len(self.mlf[f])): self.mlf[f][i][0] = int(self.mlf[f][i][0] / frame_length / 10000000) self.mlf[f][i][1] = int(self.mlf[f][i][1] / frame_length / 10000000) # shorten the last segment by 10 frames as there may not be enough data for a final frame self.mlf[f][i][1] -= 10 # remove the zero or negative length segments that could be created by the previous step if self.mlf[f][i][0] >= self.mlf[f][i][1]: del self.mlf[f][i]
[docs] def trim_segments(self, n=3): """Remove n-frames from the beginning and the end of a segment.""" if n: for f in self.mlf: transcription = [] for s, e, l in self.mlf[f]: if s + n < e - n: # trim transcription.append([s + n, e - n, l]) else: # skip this segment as it is too short to be accuratelly aligned pass self.mlf[f] = transcription
[docs] def shorten_segments(self, n=100): """Shorten segments to n-frames.""" if n: for f in self.mlf: # print f transcription = [] for s, e, l in self.mlf[f]: if e - s > 2*n+2: # clip the middle part of the segment transcription.append([s, s + n, l]) transcription.append([e-n-1, e-1, l]) # print transcription[-2] # print transcription[-1] # print '.' else: # it is short enough transcription.append([s, e, l]) # print transcription[-1] # print '.' self.mlf[f] = transcription
[docs] def count_length(self, pattern): """Count length of all segments matching the pattern""" length = 0 for f in self.mlf: for s, e, l in self.mlf[f]: if l == pattern: length += e - s return length
[docs]class MLFFeaturesAlignedArray: """Creates array like object from multiple mlf files and corresponding audio data. For each aligned frame it returns a feature vector and its label. If a filter is set to a particular value, then only frames with the label equal to the filer will be returned. In this case, the label is not returned when iterating through the array. """ def __init__(self, filter=None): self.filter = filter self.mlfs = [] self.trns = [] self.last_file_name = None self.last_param_file_features = None def __iter__(self): """Allows to iterate over all frames in the the appended mlf and param files. The required data are loaded as necessary. This is a memory efficient solution """ for mlf in self.mlfs: for f in mlf: for s, e, l in mlf[f]: for i in range(s, e): # print f, s, e, l, i if self.filter: if l == self.filter: yield self.get_frame(f, i) else: # skip a frame not matching the filter continue else: yield [self.get_frame(f, i), l]
[docs] def append_mlf(self, mlf): """Add a mlf file with aligned transcriptions.""" self.mlfs.append(mlf)
[docs] def append_trn(self, trn): """Adds files with audio data (param files) based on the provided pattern.""" trn_files = glob.glob(trn) # print "TF", trn_files self.trns.extend(trn_files)
@lru_cache(maxsize=100000)
[docs] def get_param_file_name(self, file_name): """Returns the matching param file name.""" for trn in self.trns: if file_name in trn: return trn
[docs] def get_frame(self, file_name, frame_id): """Returns a frame from a specific param file.""" if self.last_file_name != file_name: # find matching param file param_file_name = self.get_param_file_name(file_name) # open the param file self.last_param_file_features = Features(param_file_name) self.last_file_name = file_name return self.last_param_file_features[frame_id]
[docs]class MLFMFCCOnlineAlignedArray(MLFFeaturesAlignedArray): """This is an extension of MLFFeaturesAlignedArray which computes the features on the fly from the input wav files. It uses our own implementation of the MFCC computation. As a result it does not give the same results as the HTK HCopy. The experience suggests that our MFFC features are worse than the features generated by HCopy. """ def __init__(self, windowsize=250000, targetrate=100000, filter=None, usec0=False, usedelta=True, useacc=True, n_last_frames=0, mel_banks_only = False): """Initialise the MFCC front-end. windowsize - defines the length of the window (frame) in the HTK's 100ns units targetrate - defines the period with which new coefficients should be generated (again in 100ns units) """ MLFFeaturesAlignedArray.__init__(self, filter) self.windowsize = windowsize self.targetrate = targetrate self.usec0 = usec0 self.usedelta = usedelta self.useacc = useacc self.n_last_frames = n_last_frames self.mel_banks_only = mel_banks_only self.mfcc_front_end = None
[docs] def get_frame(self, file_name, frame_id): """Returns a frame from a specific param file.""" if self.last_file_name != file_name: self.last_file_name = file_name # print "FN", file_name # find matching param file param_file_name = self.get_param_file_name(file_name) if param_file_name == None: raise Exception("MLFMFCCOnlineAlignedArray: param_file_name cannot be None, file_name: " + file_name) # print "PFN", param_file_name # open the param file try: self.last_param_file_features = wave.open(param_file_name, 'r') except AttributeError: print "Error opening file:", param_file_name if self.last_param_file_features.getnchannels() != 1: raise Exception('Input wave is not in mono') if self.last_param_file_features.getsampwidth() != 2: raise Exception('Input wave is not in 16bit') sample_rate = self.last_param_file_features.getframerate() self.frame_size = int(sample_rate * self.windowsize / 10000000) if self.frame_size > 1024: self.frame_size = 2048 elif self.frame_size > 512: self.frame_size = 1024 elif self.frame_size > 256: self.frame_size = 512 elif self.frame_size > 128: self.frame_size = 256 elif self.frame_size > 64: self.frame_size = 128 self.frame_shift = int(sample_rate * self.targetrate / 10000000) self.mfcc_front_end = MFCCFrontEnd(sample_rate, self.frame_size, usec0=self.usec0, usedelta=self.usedelta, useacc=self.useacc, n_last_frames=self.n_last_frames, mel_banks_only = self.mel_banks_only) # print "FS", self.frame_size self.last_param_file_features.setpos(max(frame_id * self.frame_shift - int(self.frame_size / 2), 0)) frame = self.last_param_file_features.readframes(self.frame_size) # print "LN", len(frame) frame = numpy.frombuffer(frame, dtype=numpy.int16) try: mfcc_params = self.mfcc_front_end.param(frame) except ValueError: print file_name, frame_id, len(frame) raise return mfcc_params