Module audian.data

Class managing all raw data, spectrograms, filtered and derived data and the time window shown.

Classes

class Data (file_path, **kwargs)
Expand source code
class Data(object):

    def __init__(self, file_path, **kwargs):
        self.buffer_time = 60
        self.back_time = 20
        self.follow_time = 0
        self.file_path = file_path
        self.load_kwargs = kwargs
        self.data = None
        self.rate = None
        self.channels = 0
        self.start_time = None
        self.meta_data = {}
        self.tbefore = 0
        self.tafter = 0
        self.traces = []
        self.sources = []


    def add_trace(self, trace):
        self.traces.append(trace)


    def remove_trace(self, name):
        t = self[name]
        if t is not None:
            i = self.traces.index(t)
            del self.traces[i]


    def clear_traces(self):
        self.traces = []

        
    def __del__(self):
        self.close()

            
    def __len__(self):
        return len(self.traces)

            
    def __getitem__(self, key):
        for trace in self.traces:
            if trace.name.lower() == key.lower():
                return trace
        return None

            
    def __contains__(self, key):
        for trace in self.traces:
            if trace.name.lower() == key.lower():
                return True
        return False


    def keys(self):
        return [trace.name for trace in self.traces]


    def get_trace_names(self, class_name):
        traces = []
        for trace in self.traces:
            if isinstance(trace, class_name):
                traces.append(trace.name)
        return traces


    def is_visible(self, name):
        if name in self:
            for pi in self[name].plot_items:
                if pi is not None and pi.isVisible():
                    return True
        return False


    def set_visible(self, name, show):
        changed = False
        if name in self:
            for pi in self[name].plot_items:
                if pi is not None:
                    if pi.isVisible() != show:
                        changed = True
                    pi.setVisible(show)
        return changed

    
    def get_region(self, t0, t1, channel):
        traces = {}
        for t in self.traces:
            i0 = int(t0*t.rate)
            if i0 < 0:
                i0 = 0
            i1 = int(t1*t.rate) + 1
            if i1 > len(t):
                i1 = len(t)
            time = np.arange(i0, i1)/t.rate
            data = t[i0:i1, channel]
            if isinstance(t, BufferedSpectrogram):
                freqs = t.frequencies
                traces[t.name] = (time, freqs, data)
            else:
                traces[t.name] = (time, data)
        return traces
    
        
    def setup_traces(self):
        """ order trace sequence.
        """
        traces = []
        self.sources = []
        i = -1
        while i < len(traces):
            sname = traces[i].name if i >= 0 else 'data'
            dtraces = []
            for k in range(len(self.traces)):
                if self.traces[k] is not None and \
                   self.traces[k].source_name is sname:
                    dtraces.append(self.traces[k])
                    self.traces[k] = None
            for t in reversed(dtraces):
                traces.insert(i + 1, t)
                self.sources.insert(i + 1, i)
            i += 1
        if len(traces) < len(self.traces):
            for trace in self.traces:
                if trace is not None:
                    print(f'! ERROR: source "{trace.source_name}" for trace "{trace.name}" not found!')
            print('! the following sources are available:')
            print('  data')
            for source in traces:
                print(f'  {source.name}')
        self.traces = traces

        
    def open(self, unwrap, unwrap_clip):
        if not self.data is None:
            self.data.close()
        # expand buffer times:
        self.tbefore = 0
        self.tafter = 0
        tbefore = [0] * len(self.traces)
        tafter = [0] * len(self.traces)
        for k in reversed(range(len(self.traces))):
            tb, ta = self.traces[k].expand_times(tbefore[k], tafter[k])
            i = self.sources[k]
            if i < 0:
                self.tbefore = max(self.tbefore, tb)
                self.tafter = max(self.tafter, ta)
            else:
                tbefore[i] = max(tbefore[i], tb)
                tafter[i] = max(tafter[i], ta)
        # raw data:        
        tbuffer = self.buffer_time + self.tbefore + self.tafter
        tback = self.back_time + self.tbefore
        try:
            self.data = DataLoader(self.file_path, tbuffer, tback,
                                   **self.load_kwargs)
        except IOError:
            self.data = None
            return
        self.data.set_unwrap(unwrap, unwrap_clip, False, self.data.unit)
        self.data.follow = int(self.follow_time*self.data.rate)
        self.data.name = 'data'
        self.data.panel = 'trace'
        self.data.panel_type =  'trace'
        self.data.plot_items = [None]*self.data.channels
        self.data.color = '#0000ee'
        self.data.lw_thin = 1.1
        self.data.lw_thick = 2
        self.data.dests = []
        self.data.need_update = False
        self.traces.insert(0, self.data)
        self.sources = [None] + [i + 1 for i in self.sources]
        self.file_path = self.data.filepath
        self.rate = self.data.rate
        self.channels = self.data.channels
        # metadata:
        self.meta_data = dict(Format=self.data.format_dict())
        self.meta_data.update(self.data.metadata())
        self.start_time = get_datetime(self.meta_data)
        # derived data:
        for trace, source in zip(self.traces[1:], self.sources[1:]):
            trace.open(self.traces[source])
        self.set_need_update()


    def close(self):
        if not self.data is None:
            self.data.close()
            self.data = None

            
    def set_need_update(self):
        if self.data is None:
            return
        self.data.need_update = False
        for pi in self.data.plot_items:
            if pi is not None and pi.isVisible():
                self.data.need_update = True
                break
        for d in self.data.dests:
            d.set_need_update()

            
    def update_times(self, t0, t1):
        if self.data.need_update:
            self.data.update_time(t0 - self.tbefore,
                                  t1 + self.tafter)
        for trace in self.traces[1:]:
            if trace.need_update:
                trace.align_buffer()

Methods

def add_trace(self, trace)
def remove_trace(self, name)
def clear_traces(self)
def keys(self)
def get_trace_names(self, class_name)
def is_visible(self, name)
def set_visible(self, name, show)
def get_region(self, t0, t1, channel)
def setup_traces(self)

order trace sequence.

def open(self, unwrap, unwrap_clip)
def close(self)
def set_need_update(self)
def update_times(self, t0, t1)