Module audian.buffereddata
Base class for computed data.
Classes
class BufferedData (name, source_name, tbefore=0, tafter=0, panel='none', panel_type='trace', color='#00ee00', lw_thin=1.1, lw_thick=2)
-
Random access to time-series data of which only a part is held in memory.
This is a base class for accessing large audio recordings either from a file (class
AudioLoader
) or by computing its contents on the fly (e.g. filtered data, envelopes or spectrograms). TheBufferedArray
behaves like a single big ndarray with first dimension indexing the frames and second dimension indexing the channels of the data. Higher dimensions are also supported. For example, a third dimension for frequencies needed for spectrograms. Internally the class holds only a part of the data in memory. The size of this buffer is set tobufferframes
frames. If more data are requested, the buffer is enlarged accordingly.Classes inheriting
BufferedArray
just need to implementself.load_buffer(offset, nsamples, pbuffer)
This function needs to load the supplied
pbuffer
withnframes
frames of data starting at frameoffset
.In the constructor or some kind of opening function, you need to set the following member variables, followed by a call to
init_buffer()
:self.rate # number of frames per second self.channels # number of channels per frame self.frames # total number of frames self.shape = (self.frames, self.channels, ...) self.bufferframes # number of frames the buffer should hold self.backframes # number of frames kept for moving back self.init_buffer()
or provide all this information via the constructor:
Parameters
rate
:float
- The sampling rate of the data in seconds.
channels
:int
- The number of channels.
frames
:int
- The number of frames.
bufferframes
:int
- Number of frames the curent data buffer holds.
backframes
:int
- Number of frames the curent data buffer should keep before requested data ranges.
verbose
:int
- If larger than zero show detailed error/warning messages.
Attributes
rate
:float
- The sampling rate of the data in seconds.
channels
:int
- The number of channels.
frames
:int
- The number of frames. Same as
len()
. shape
:tuple
- Frames and channels of the data. Optional higher dimensions.
ndim
:int
- Number of dimensions: 2 (frames and channels) or higher.
size
:int
- Total number of samples: frames times channels.
offset
:int
- Index of first frame in the current buffer.
buffer
:ndarray
offloats
- The curently available data. First dimension is time, second channels.
Optional higher dimensions according to
ndim
andshape
. bufferframes
:int
- Number of samples the curent data buffer holds.
backframes
:int
- Number of samples the curent data buffer should keep before requested data ranges.
follow
:int
- If zero (default), move buffer position only for requests outside
the current buffer.
If larger than zero then buffer position follows requested data ranges
if buffer can be moved by more than
follow
frames. This results in more frequent but smaller buffer updates. Set it after calling the constructor orinit_buffer()
. buffer_changed
:ndarray
ofbool
- For each channel a flag, whether the buffer content has been changed.
Set to
True
, wheneverload_buffer()
was called.
Methods
len()
: Number of frames.__getitem__
: Access data.blocks()
: Generator for blockwise processing of AudioLoader data.update_buffer()
: make sure that the buffer contains data of a range of indices.update_time()
: make sure that the buffer contains data of a given time range.reload_buffer()
: reload the current buffer.load_buffer()
: load a range of samples into a buffer.move_buffer()
: move and resize buffer.buffer_position()
: compute position and size of buffer.recycle_buffer()
: move buffer to new position and recycle content if possible.
Notes
Access via
__getitem__
or__next__
is slow! Even worse, using numpy functions on this class first converts it to a numpy array - that is something we actually do not want! We should subclass directly from numpy.ndarray . For details see http://docs.scipy.org/doc/numpy/user/basics.subclassing.html When subclassing, there is an offset argument, that might help to speed up__getitem__
.Construtor for initializing 2D arrays (times x channels).
Expand source code
class BufferedData(BufferedArray): def __init__(self, name, source_name, tbefore=0, tafter=0, panel='none', panel_type='trace', color='#00ee00', lw_thin=1.1, lw_thick=2): super().__init__(verbose=0) self.name = name self.source_name = source_name self.tbefore = 0 self.tafter = 0 self.panel = panel self.panel_type = panel_type self.plot_items = [] self.color = color self.lw_thin = lw_thin self.lw_thick = lw_thick self.source = None self.source_tbefore = tbefore self.source_tafter = tafter self.dests = [] self.need_update = False def expand_times(self, tbefore, tafter): self.tbefore += tbefore self.tafter += tafter return self.source_tbefore + tbefore, self.source_tafter + tafter def update_step(self, step=1, more_shape=None): tbuffer = self.bufferframes/self.rate if step < 1: step = 1 self.rate = self.source.rate/step self.frames = (self.source.frames + step - 1)//step if more_shape is None: self.shape = (self.frames, self.channels) else: self.shape = (self.frames, self.channels) + more_shape self.ndim = len(self.shape) self.size = self.frames * self.channels if self.source.bufferframes == self.source.frames: self.bufferframes = self.frames else: self.bufferframes = int(tbuffer*self.rate) self.offset = (self.source.offset + step - 1)//step self.follow = 0 def open(self, source, step=1, more_shape=None): self.source = source self.source.dests.append(self) self.ampl_min = source.ampl_min self.ampl_max = source.ampl_max self.unit = source.unit self.bufferframes = 0 self.backframes = 0 self.channels = self.source.channels self.rate = self.source.rate self.buffer_changed = np.zeros(self.channels, dtype=bool) self.buffer = np.zeros((0, self.channels)) self.plot_items = [None]*self.channels self.update_step(step, more_shape) def align_buffer(self): soffset = self.source.offset snframes = len(self.source.buffer) if soffset > 0: n = floor(self.source_tbefore*self.source.rate) soffset += n snframes -= n if self.source.offset + len(self.source.buffer) < self.source.frames: n = floor(self.source_tafter*self.source.rate) snframes -= n offset = ceil(soffset*self.rate/self.source.rate) nframes = floor((soffset + snframes)*self.rate/self.source.rate) - offset self.move_buffer(offset, nframes) self.bufferframes = len(self.buffer) def load_buffer(self, offset, nframes, buffer): print(f'load {self.name} {offset/self.rate:.3f} - {(offset + nframes)/self.rate:.3f}') # transform to rate of source buffer: soffset = floor(offset*self.source.rate/self.rate) snframes = ceil((offset + nframes)*self.source.rate/self.rate) - soffset nbefore = floor(self.source_tbefore/self.source.rate) soffset -= nbefore snframes += nbefore nafter = ceil(self.source_tafter/self.source.rate) snframes += nafter soffset -= self.source.offset if soffset < 0: nbefore += soffset snframes += soffset soffset = 0 if soffset + snframes > len(self.source.buffer): snframes = len(self.source.buffer) - soffset source = self.source.buffer[soffset:soffset + snframes] self.process(source, buffer, nbefore) def recompute(self): if len(self.source.buffer) > 0: self.allocate_buffer() self.reload_buffer() def is_visible(self): for pi in self.plot_items: if pi is not None and pi.isVisible(): return True return False def set_visible(self, show): for pi in self.plot_items: if pi is not None: pi.setVisible(show) def set_need_update(self): self.need_update = False for pi in self.plot_items: if pi is not None and pi.isVisible(): self.need_update = True break for d in self.dests: d.set_need_update() # end of dependency chain: if len(self.dests) == 0: # go to sources and propagate needed update: trace = self while hasattr(trace, 'source'): s = trace.source s.need_update = trace.need_update or s.need_update trace = s def recompute_all(self): if self.need_update: self.recompute() for d in self.dests: d.recompute_all()
Ancestors
- audioio.bufferedarray.BufferedArray
Subclasses
Methods
def expand_times(self, tbefore, tafter)
def update_step(self, step=1, more_shape=None)
def open(self, source, step=1, more_shape=None)
def align_buffer(self)
def load_buffer(self, offset, nframes, buffer)
def recompute(self)
def is_visible(self)
def set_visible(self, show)
def set_need_update(self)
def recompute_all(self)