Module audian.buffereddata

Base class for computed data.

Classes

class BufferedData (name,
source_name,
tbefore=0,
tafter=0,
panel='none',
panel_type='trace',
color='#00ee00',
lw_thin=1.1,
lw_thick=2)
Expand source code
class BufferedData(BufferedArray):

    def __init__(self, name, source_name, tbefore=0, tafter=0,
                 panel='none', panel_type='trace',
                 color='#00ee00', lw_thin=1.1, lw_thick=2):
        super().__init__(verbose=0)
        self.name = name
        self.source_name = source_name
        self.tbefore = 0
        self.tafter = 0
        self.panel = panel
        self.panel_type = panel_type
        self.plot_items = []
        self.color = color
        self.lw_thin = lw_thin
        self.lw_thick = lw_thick
        self.source = None
        self.source_tbefore = tbefore
        self.source_tafter = tafter
        self.dests = []
        self.need_update = False


    def expand_times(self, tbefore, tafter):
        self.tbefore += tbefore
        self.tafter += tafter
        return self.source_tbefore + tbefore, self.source_tafter + tafter


    def update_step(self, step=1, more_shape=None):
        tbuffer = self.bufferframes/self.rate
        if step < 1:
            step = 1
        self.rate = self.source.rate/step
        self.frames = (self.source.frames + step - 1)//step
        if more_shape is None:
            self.shape = (self.frames, self.channels)
        else:
            self.shape = (self.frames, self.channels) + more_shape
        self.ndim = len(self.shape)
        self.size = self.frames * self.channels
        if self.source.bufferframes == self.source.frames:
            self.bufferframes = self.frames
        else:
            self.bufferframes = int(tbuffer*self.rate)
        self.offset = (self.source.offset + step - 1)//step
        self.follow = 0

        
    def open(self, source, step=1, more_shape=None):
        self.source = source
        self.source.dests.append(self)
        self.ampl_min = source.ampl_min
        self.ampl_max = source.ampl_max
        self.unit = source.unit
        self.bufferframes = 0
        self.backframes = 0
        self.channels = self.source.channels
        self.rate = self.source.rate
        self.buffer_changed = np.zeros(self.channels, dtype=bool)
        self.buffer = np.zeros((0, self.channels))
        self.plot_items = [None]*self.channels
        self.update_step(step, more_shape)

        
    def align_buffer(self):
        soffset = self.source.offset
        snframes = len(self.source.buffer)
        if soffset > 0:
            n = floor(self.source_tbefore*self.source.rate)
            soffset += n
            snframes -= n
        if self.source.offset + len(self.source.buffer) < self.source.frames:
            n = floor(self.source_tafter*self.source.rate)
            snframes -= n
        offset = ceil(soffset*self.rate/self.source.rate)
        nframes = floor((soffset + snframes)*self.rate/self.source.rate) - offset
        self.move_buffer(offset, nframes)
        self.bufferframes = len(self.buffer)


    def load_buffer(self, offset, nframes, buffer):
        print(f'load {self.name} {offset/self.rate:.3f} - {(offset + nframes)/self.rate:.3f}')
        # transform to rate of source buffer:
        soffset = floor(offset*self.source.rate/self.rate)
        snframes = ceil((offset + nframes)*self.source.rate/self.rate) - soffset
        nbefore = floor(self.source_tbefore/self.source.rate)
        soffset -= nbefore
        snframes += nbefore
        nafter = ceil(self.source_tafter/self.source.rate)
        snframes += nafter
        soffset -= self.source.offset
        if soffset < 0:
            nbefore += soffset
            snframes += soffset
            soffset = 0
        if soffset + snframes > len(self.source.buffer):
            snframes = len(self.source.buffer) - soffset
        source = self.source.buffer[soffset:soffset + snframes]
        self.process(source, buffer, nbefore)


    def recompute(self):
        if len(self.source.buffer) > 0:
            self.allocate_buffer()
        self.reload_buffer()


    def is_visible(self):
        for pi in self.plot_items:
            if pi is not None and pi.isVisible():
                return True
        return False


    def set_visible(self, show):
        for pi in self.plot_items:
            if pi is not None:
                pi.setVisible(show)


    def set_need_update(self):
        self.need_update = False
        for pi in self.plot_items:
            if pi is not None and pi.isVisible():
                self.need_update = True
                break
        for d in self.dests:
            d.set_need_update()
        # end of dependency chain:
        if len(self.dests) == 0:
            # go to sources and propagate needed update:
            trace = self
            while hasattr(trace, 'source'):
                s = trace.source
                s.need_update = trace.need_update or s.need_update
                trace = s
            

    def recompute_all(self):
        if self.need_update:
            self.recompute()
            for d in self.dests:
                d.recompute_all()

Random access to time-series data of which only a part is held in memory.

This is a base class for accessing large audio recordings either from a file (class AudioLoader) or by computing its contents on the fly (e.g. filtered data, envelopes or spectrograms). The BufferedArray behaves like a single big ndarray with first dimension indexing the frames and second dimension indexing the channels of the data. Higher dimensions are also supported. For example, a third dimension for frequencies needed for spectrograms. Internally the class holds only a part of the data in memory. The size of this buffer is set to bufferframes frames. If more data are requested, the buffer is enlarged accordingly.

Classes inheriting BufferedArray just need to implement

self.load_buffer(offset, nsamples, pbuffer)

This function needs to load the supplied pbuffer with nframes frames of data starting at frame offset.

In the constructor or some kind of opening function, you need to set the following member variables, followed by a call to init_buffer():

self.rate            # number of frames per second
self.channels        # number of channels per frame
self.frames          # total number of frames
self.shape = (self.frames, self.channels, ...)        
self.bufferframes    # number of frames the buffer should hold
self.backframes      # number of frames kept for moving back
self.init_buffer()

or provide all this information via the constructor:

Parameters

rate : float
The sampling rate of the data in seconds.
channels : int
The number of channels.
frames : int
The number of frames.
bufferframes : int
Number of frames the curent data buffer holds.
backframes : int
Number of frames the curent data buffer should keep before requested data ranges.
verbose : int
If larger than zero show detailed error/warning messages.

Attributes

rate : float
The sampling rate of the data in seconds.
channels : int
The number of channels.
frames : int
The number of frames. Same as len().
shape : tuple
Frames and channels of the data. Optional higher dimensions.
ndim : int
Number of dimensions: 2 (frames and channels) or higher.
size : int
Total number of samples: frames times channels.
offset : int
Index of first frame in the current buffer.
buffer : ndarray of floats
The curently available data. First dimension is time, second channels. Optional higher dimensions according to ndim and shape.
bufferframes : int
Number of samples the curent data buffer holds.
backframes : int
Number of samples the curent data buffer should keep before requested data ranges.
buffer_changed : ndarray of bool
For each channel a flag, whether the buffer content has been changed. Set to True, whenever load_buffer() was called.

Methods

  • len(): Number of frames.
  • __getitem__: Access data.
  • blocks(): Generator for blockwise processing of the data.
  • update_buffer(): make sure that the buffer contains data of a range of indices.
  • update_time(): make sure that the buffer contains data of a given time range.
  • reload_buffer(): reload the current buffer.
  • move_buffer(): move and resize buffer (called by update_buffer()).
  • load_buffer(): load a range of samples into a buffer (called by reload_buffer() and move_buffer()).
  • _buffer_position(): compute position and size of buffer (used by update_buffer()).
  • _recycle_buffer(): move buffer to new position and recycle content if possible (called by move_buffer()).
  • allocate_buffer(): reallocate the buffer to have the right size (called by _recycle_buffer()).

Notes

Access via __getitem__ or __next__ is slow! Even worse, using numpy functions on this class first converts it to a numpy array - that is something we actually do not want! We should subclass directly from numpy.ndarray . For details see http://docs.scipy.org/doc/numpy/user/basics.subclassing.html When subclassing, there is an offset argument, that might help to speed up __getitem__ .

Construtor for initializing 2D arrays (times x channels).

Ancestors

  • audioio.bufferedarray.BufferedArray

Subclasses

Methods

def expand_times(self, tbefore, tafter)
Expand source code
def expand_times(self, tbefore, tafter):
    self.tbefore += tbefore
    self.tafter += tafter
    return self.source_tbefore + tbefore, self.source_tafter + tafter
def update_step(self, step=1, more_shape=None)
Expand source code
def update_step(self, step=1, more_shape=None):
    tbuffer = self.bufferframes/self.rate
    if step < 1:
        step = 1
    self.rate = self.source.rate/step
    self.frames = (self.source.frames + step - 1)//step
    if more_shape is None:
        self.shape = (self.frames, self.channels)
    else:
        self.shape = (self.frames, self.channels) + more_shape
    self.ndim = len(self.shape)
    self.size = self.frames * self.channels
    if self.source.bufferframes == self.source.frames:
        self.bufferframes = self.frames
    else:
        self.bufferframes = int(tbuffer*self.rate)
    self.offset = (self.source.offset + step - 1)//step
    self.follow = 0
def open(self, source, step=1, more_shape=None)
Expand source code
def open(self, source, step=1, more_shape=None):
    self.source = source
    self.source.dests.append(self)
    self.ampl_min = source.ampl_min
    self.ampl_max = source.ampl_max
    self.unit = source.unit
    self.bufferframes = 0
    self.backframes = 0
    self.channels = self.source.channels
    self.rate = self.source.rate
    self.buffer_changed = np.zeros(self.channels, dtype=bool)
    self.buffer = np.zeros((0, self.channels))
    self.plot_items = [None]*self.channels
    self.update_step(step, more_shape)
def align_buffer(self)
Expand source code
def align_buffer(self):
    soffset = self.source.offset
    snframes = len(self.source.buffer)
    if soffset > 0:
        n = floor(self.source_tbefore*self.source.rate)
        soffset += n
        snframes -= n
    if self.source.offset + len(self.source.buffer) < self.source.frames:
        n = floor(self.source_tafter*self.source.rate)
        snframes -= n
    offset = ceil(soffset*self.rate/self.source.rate)
    nframes = floor((soffset + snframes)*self.rate/self.source.rate) - offset
    self.move_buffer(offset, nframes)
    self.bufferframes = len(self.buffer)
def load_buffer(self, offset, nframes, buffer)
Expand source code
def load_buffer(self, offset, nframes, buffer):
    print(f'load {self.name} {offset/self.rate:.3f} - {(offset + nframes)/self.rate:.3f}')
    # transform to rate of source buffer:
    soffset = floor(offset*self.source.rate/self.rate)
    snframes = ceil((offset + nframes)*self.source.rate/self.rate) - soffset
    nbefore = floor(self.source_tbefore/self.source.rate)
    soffset -= nbefore
    snframes += nbefore
    nafter = ceil(self.source_tafter/self.source.rate)
    snframes += nafter
    soffset -= self.source.offset
    if soffset < 0:
        nbefore += soffset
        snframes += soffset
        soffset = 0
    if soffset + snframes > len(self.source.buffer):
        snframes = len(self.source.buffer) - soffset
    source = self.source.buffer[soffset:soffset + snframes]
    self.process(source, buffer, nbefore)
def recompute(self)
Expand source code
def recompute(self):
    if len(self.source.buffer) > 0:
        self.allocate_buffer()
    self.reload_buffer()
def is_visible(self)
Expand source code
def is_visible(self):
    for pi in self.plot_items:
        if pi is not None and pi.isVisible():
            return True
    return False
def set_visible(self, show)
Expand source code
def set_visible(self, show):
    for pi in self.plot_items:
        if pi is not None:
            pi.setVisible(show)
def set_need_update(self)
Expand source code
def set_need_update(self):
    self.need_update = False
    for pi in self.plot_items:
        if pi is not None and pi.isVisible():
            self.need_update = True
            break
    for d in self.dests:
        d.set_need_update()
    # end of dependency chain:
    if len(self.dests) == 0:
        # go to sources and propagate needed update:
        trace = self
        while hasattr(trace, 'source'):
            s = trace.source
            s.need_update = trace.need_update or s.need_update
            trace = s
def recompute_all(self)
Expand source code
def recompute_all(self):
    if self.need_update:
        self.recompute()
        for d in self.dests:
            d.recompute_all()