Module audian.bufferedspectrogram
Spectrogram of source data on the fly.
class BufferedSpectrogram (name='spectrogram', source='filtered', panel='spectrogram', nfft=256, hop_frac=0.5)
Expand source code
class BufferedSpectrogram(BufferedData): def __init__(self, name='spectrogram', source='filtered', panel='spectrogram', nfft=256, hop_frac=0.5): super().__init__(name, source, tafter=10, panel=panel, panel_type='spectrogram') self.nfft = nfft self.hop_frac = hop_frac self.hop = 0 self.set_hop() self.frequencies = np.zeros(0) self.fresolution = 1 self.tresolution = 1 self.spec_rect = [] self.use_spec = True self.init = True def open(self, source): self.hop = int(self.nfft*self.hop_frac) self.fresolution = source.rate/self.nfft self.frequencies = np.arange(0, source.rate/2 + self.fresolution/2, self.fresolution) self.tresolution = self.hop/source.rate self.spec_rect = [] self.use_spec = True super().open(source, self.hop, more_shape=(self.nfft//2 + 1,)) self.unit = f'{self.unit}^2/Hz' self.ampl_min = 0 self.ampl_max = self.source.rate/2 def process(self, source, dest, nbefore): nsource = (len(dest) - 1)*self.hop + self.nfft if nsource > len(source): nsource = len(source) if nsource >= self.nfft: with np.errstate(under='ignore'): freq, time, Sxx = spectrogram(source[:nsource], self.source.rate, nperseg=self.nfft, noverlap=self.nfft - self.hop, axis=0) n = Sxx.shape[2] dest[:n] = Sxx.transpose((2, 1, 0)) dest[n:] = 0 self.frequencies = freq else: dest[:] = 0 # extent of the full buffer: self.spec_rect = [self.offset/self.rate, 0, len(self.buffer)/self.rate, self.source.rate/2 + self.fresolution] def set_hop(self): hop = int(np.round(self.hop_frac*self.nfft)) if hop < 1: hop = 1 if hop > self.nfft: hop = self.nfft if self.hop != hop: self.hop = hop self.hop_frac = self.hop/self.nfft return True else: return False def update(self, nfft=None, hop_frac=None): spec_update = False if nfft is not None: if nfft < 8: nfft = 8 max_nfft = min(len(self.source)//2, 2**30) if nfft > max_nfft: nfft = max_nfft if self.nfft != nfft: self.nfft = nfft spec_update = True if hop_frac is not None: if hop_frac > 1.0: hop_frac = 1.0 self.hop_frac = hop_frac if self.set_hop(): spec_update = True if spec_update: self.tresolution = self.hop/self.source.rate self.fresolution = self.source.rate/self.nfft self.update_step(self.hop, more_shape=(self.nfft//2 + 1,)) self.recompute_all() def estimate_noiselevels(self, channel): if not self.init or len(self.buffer) == 0 or len(self.buffer.shape) < 3: return None, None nf = self.buffer.shape[2]//16 if nf < 1: nf = 1 with np.errstate(all='ignore'): # check what is going on!!! zmin = np.percentile(decibel(self.buffer[:, channel, -nf:]), 95) zmax = np.max(decibel(self.buffer[:, channel, :])) if not np.isfinite(zmin) or not np.isfinite(zmax): return None, None self.init = False zmax = zmin + 0.95*(zmax - zmin) if zmax - zmin < 20: zmax = zmin + 20 if zmax - zmin > 80: zmin = zmax - 80 return zmin, zmax
Random access to time-series data of which only a part is held in memory.
This is a base class for accessing large audio recordings either from a file (class
) or by computing its contents on the fly (e.g. filtered data, envelopes or spectrograms). TheBufferedArray
behaves like a single big ndarray with first dimension indexing the frames and second dimension indexing the channels of the data. Higher dimensions are also supported. For example, a third dimension for frequencies needed for spectrograms. Internally the class holds only a part of the data in memory. The size of this buffer is set tobufferframes
frames. If more data are requested, the buffer is enlarged accordingly.Classes inheriting
just need to implementself.load_buffer(offset, nsamples, pbuffer)
This function needs to load the supplied
frames of data starting at frameoffset
.In the constructor or some kind of opening function, you need to set the following member variables, followed by a call to
:self.rate # number of frames per second self.channels # number of channels per frame self.frames # total number of frames self.shape = (self.frames, self.channels, ...) self.bufferframes # number of frames the buffer should hold self.backframes # number of frames kept for moving back self.init_buffer()
or provide all this information via the constructor:
- The sampling rate of the data in seconds.
- The number of channels.
- The number of frames.
- Number of frames the curent data buffer holds.
- Number of frames the curent data buffer should keep before requested data ranges.
- If larger than zero show detailed error/warning messages.
- The sampling rate of the data in seconds.
- The number of channels.
- The number of frames. Same as
. shape
- Frames and channels of the data. Optional higher dimensions.
- Number of dimensions: 2 (frames and channels) or higher.
- Total number of samples: frames times channels.
- Index of first frame in the current buffer.
- The curently available data. First dimension is time, second channels.
Optional higher dimensions according to
. bufferframes
- Number of samples the curent data buffer holds.
- Number of samples the curent data buffer should keep before requested data ranges.
- For each channel a flag, whether the buffer content has been changed.
Set to
, wheneverload_buffer()
was called.
: Number of frames.__getitem__
: Access data.blocks()
: Generator for blockwise processing of the data.update_buffer()
: make sure that the buffer contains data of a range of indices.update_time()
: make sure that the buffer contains data of a given time range.reload_buffer()
: reload the current buffer.move_buffer()
: move and resize buffer (called by update_buffer()).load_buffer()
: load a range of samples into a buffer (called by reload_buffer() and move_buffer())._buffer_position()
: compute position and size of buffer (used by update_buffer())._recycle_buffer()
: move buffer to new position and recycle content if possible (called by move_buffer()).allocate_buffer()
: reallocate the buffer to have the right size (called by _recycle_buffer()).
Access via
is slow! Even worse, using numpy functions on this class first converts it to a numpy array - that is something we actually do not want! We should subclass directly from numpy.ndarray . For details see When subclassing, there is an offset argument, that might help to speed up__getitem__
.Construtor for initializing 2D arrays (times x channels).
- BufferedData
- audioio.bufferedarray.BufferedArray
def open(self, source)
Expand source code
def open(self, source): self.hop = int(self.nfft*self.hop_frac) self.fresolution = source.rate/self.nfft self.frequencies = np.arange(0, source.rate/2 + self.fresolution/2, self.fresolution) self.tresolution = self.hop/source.rate self.spec_rect = [] self.use_spec = True super().open(source, self.hop, more_shape=(self.nfft//2 + 1,)) self.unit = f'{self.unit}^2/Hz' self.ampl_min = 0 self.ampl_max = self.source.rate/2
def process(self, source, dest, nbefore)
Expand source code
def process(self, source, dest, nbefore): nsource = (len(dest) - 1)*self.hop + self.nfft if nsource > len(source): nsource = len(source) if nsource >= self.nfft: with np.errstate(under='ignore'): freq, time, Sxx = spectrogram(source[:nsource], self.source.rate, nperseg=self.nfft, noverlap=self.nfft - self.hop, axis=0) n = Sxx.shape[2] dest[:n] = Sxx.transpose((2, 1, 0)) dest[n:] = 0 self.frequencies = freq else: dest[:] = 0 # extent of the full buffer: self.spec_rect = [self.offset/self.rate, 0, len(self.buffer)/self.rate, self.source.rate/2 + self.fresolution]
def set_hop(self)
Expand source code
def set_hop(self): hop = int(np.round(self.hop_frac*self.nfft)) if hop < 1: hop = 1 if hop > self.nfft: hop = self.nfft if self.hop != hop: self.hop = hop self.hop_frac = self.hop/self.nfft return True else: return False
def update(self, nfft=None, hop_frac=None)
Expand source code
def update(self, nfft=None, hop_frac=None): spec_update = False if nfft is not None: if nfft < 8: nfft = 8 max_nfft = min(len(self.source)//2, 2**30) if nfft > max_nfft: nfft = max_nfft if self.nfft != nfft: self.nfft = nfft spec_update = True if hop_frac is not None: if hop_frac > 1.0: hop_frac = 1.0 self.hop_frac = hop_frac if self.set_hop(): spec_update = True if spec_update: self.tresolution = self.hop/self.source.rate self.fresolution = self.source.rate/self.nfft self.update_step(self.hop, more_shape=(self.nfft//2 + 1,)) self.recompute_all()
def estimate_noiselevels(self, channel)
Expand source code
def estimate_noiselevels(self, channel): if not self.init or len(self.buffer) == 0 or len(self.buffer.shape) < 3: return None, None nf = self.buffer.shape[2]//16 if nf < 1: nf = 1 with np.errstate(all='ignore'): # check what is going on!!! zmin = np.percentile(decibel(self.buffer[:, channel, -nf:]), 95) zmax = np.max(decibel(self.buffer[:, channel, :])) if not np.isfinite(zmin) or not np.isfinite(zmax): return None, None self.init = False zmax = zmin + 0.95*(zmax - zmin) if zmax - zmin < 20: zmax = zmin + 20 if zmax - zmin > 80: zmin = zmax - 80 return zmin, zmax