Module audian.bufferedspectrogram

Spectrogram of source data on the fly.

Classes

class BufferedSpectrogram (name='spectrogram', source='filtered', panel='spectrogram', nfft=256, hop_frac=0.5)

Random access to time-series data of which only a part is held in memory.

This is a base class for accessing large audio recordings either from a file (class AudioLoader) or by computing its contents on the fly (e.g. filtered data, envelopes or spectrograms). The BufferedArray behaves like a single big ndarray with first dimension indexing the frames and second dimension indexing the channels of the data. Higher dimensions are also supported. For example, a third dimension for frequencies needed for spectrograms. Internally the class holds only a part of the data in memory. The size of this buffer is set to bufferframes frames. If more data are requested, the buffer is enlarged accordingly.

Classes inheriting BufferedArray just need to implement

self.load_buffer(offset, nsamples, pbuffer)

This function needs to load the supplied pbuffer with nframes frames of data starting at frame offset.

In the constructor or some kind of opening function, you need to set the following member variables, followed by a call to init_buffer():

self.rate            # number of frames per second
self.channels        # number of channels per frame
self.frames          # total number of frames
self.shape = (self.frames, self.channels, ...)        
self.bufferframes    # number of frames the buffer should hold
self.backframes      # number of frames kept for moving back
self.init_buffer()

or provide all this information via the constructor:

Parameters

rate : float
The sampling rate of the data in seconds.
channels : int
The number of channels.
frames : int
The number of frames.
bufferframes : int
Number of frames the curent data buffer holds.
backframes : int
Number of frames the curent data buffer should keep before requested data ranges.
verbose : int
If larger than zero show detailed error/warning messages.

Attributes

rate : float
The sampling rate of the data in seconds.
channels : int
The number of channels.
frames : int
The number of frames. Same as len().
shape : tuple
Frames and channels of the data. Optional higher dimensions.
ndim : int
Number of dimensions: 2 (frames and channels) or higher.
size : int
Total number of samples: frames times channels.
offset : int
Index of first frame in the current buffer.
buffer : ndarray of floats
The curently available data. First dimension is time, second channels. Optional higher dimensions according to ndim and shape.
bufferframes : int
Number of samples the curent data buffer holds.
backframes : int
Number of samples the curent data buffer should keep before requested data ranges.
follow : int
If zero (default), move buffer position only for requests outside the current buffer. If larger than zero then buffer position follows requested data ranges if buffer can be moved by more than followframes. This results in more frequent but smaller buffer updates. Set it after calling the constructor or init_buffer().
buffer_changed : ndarray of bool
For each channel a flag, whether the buffer content has been changed. Set to True, whenever load_buffer() was called.

Methods

  • len(): Number of frames.
  • __getitem__: Access data.
  • blocks(): Generator for blockwise processing of AudioLoader data.
  • update_buffer(): make sure that the buffer contains data of a range of indices.
  • update_time(): make sure that the buffer contains data of a given time range.
  • reload_buffer(): reload the current buffer.
  • load_buffer(): load a range of samples into a buffer.
  • move_buffer(): move and resize buffer.
  • buffer_position(): compute position and size of buffer.
  • recycle_buffer(): move buffer to new position and recycle content if possible.

Notes

Access via __getitem__ or __next__ is slow! Even worse, using numpy functions on this class first converts it to a numpy array - that is something we actually do not want! We should subclass directly from numpy.ndarray . For details see http://docs.scipy.org/doc/numpy/user/basics.subclassing.html When subclassing, there is an offset argument, that might help to speed up __getitem__ .

Construtor for initializing 2D arrays (times x channels).

Expand source code
class BufferedSpectrogram(BufferedData):

    def __init__(self, name='spectrogram', source='filtered',
                 panel='spectrogram', nfft=256,
                 hop_frac=0.5):
        super().__init__(name, source, tafter=10, panel=panel,
                         panel_type='spectrogram')
        self.nfft = nfft
        self.hop_frac = hop_frac
        self.hop = 0
        self.set_hop()
        self.frequencies = np.zeros(0)
        self.fresolution = 1
        self.tresolution = 1
        self.spec_rect = []
        self.use_spec = True
        self.init = True

        
    def open(self, source):
        self.hop = int(self.nfft*self.hop_frac)
        self.fresolution = source.rate/self.nfft
        self.frequencies = np.arange(0, source.rate/2 + self.fresolution/2,
                                     self.fresolution)
        self.tresolution = self.hop/source.rate
        self.spec_rect = []
        self.use_spec = True
        super().open(source, self.hop, more_shape=(self.nfft//2 + 1,))
        self.unit = f'{self.unit}^2/Hz'
        self.ampl_min = 0
        self.ampl_max = self.source.rate/2

        
    def process(self, source, dest, nbefore):
        nsource = (len(dest) - 1)*self.hop + self.nfft
        if nsource > len(source):
            nsource = len(source)
        if nsource >= self.nfft:
            with np.errstate(under='ignore'):
                freq, time, Sxx = spectrogram(source[:nsource],
                                              self.source.rate,
                                              nperseg=self.nfft,
                                              noverlap=self.nfft - self.hop,
                                              axis=0)
            n = Sxx.shape[2]
            dest[:n] = Sxx.transpose((2, 1, 0))
            dest[n:] = 0
            self.frequencies = freq
        else:
            dest[:] = 0
        # extent of the full buffer:
        self.spec_rect = [self.offset/self.rate, 0,
                          len(self.buffer)/self.rate,
                          self.source.rate/2 + self.fresolution]


    def set_hop(self):
        hop = int(np.round(self.hop_frac*self.nfft))
        if hop < 1:
            hop = 1
        if hop > self.nfft:
            hop = self.nfft
        if self.hop != hop:
            self.hop = hop
            self.hop_frac = self.hop/self.nfft
            return True
        else:
            return False

        
    def update(self, nfft=None, hop_frac=None):
        spec_update = False
        if nfft is not None:
            if nfft < 8:
                nfft = 8
            max_nfft = min(len(self.source)//2, 2**30)
            if nfft > max_nfft:
                nfft = max_nfft
            if self.nfft != nfft:
                self.nfft = nfft
                spec_update = True
        if hop_frac is not None:
            if hop_frac > 1.0:
                hop_frac = 1.0
            self.hop_frac = hop_frac
        if self.set_hop():
            spec_update = True
        if spec_update:
            self.tresolution = self.hop/self.source.rate
            self.fresolution = self.source.rate/self.nfft
            self.update_step(self.hop, more_shape=(self.nfft//2 + 1,))
            self.recompute_all()

            
    def estimate_noiselevels(self, channel):
        if not self.init or len(self.buffer) == 0 or len(self.buffer.shape) < 3:
            return None, None
        nf = self.buffer.shape[2]//16
        if nf < 1:
            nf = 1
        with np.errstate(all='ignore'):  # check what is going on!!!
            zmin = np.percentile(decibel(self.buffer[:, channel, -nf:]), 95)
        zmax = np.max(decibel(self.buffer[:, channel, :]))
        if not np.isfinite(zmin) or not np.isfinite(zmax):
            return None, None
        self.init = False
        zmax = zmin + 0.95*(zmax - zmin)
        if zmax - zmin < 20:
            zmax = zmin + 20
        if zmax - zmin > 80:
            zmin = zmax - 80
        return zmin, zmax

Ancestors

Methods

def open(self, source)
def process(self, source, dest, nbefore)
def set_hop(self)
def update(self, nfft=None, hop_frac=None)
def estimate_noiselevels(self, channel)