Module audian.fulltraceplot

FullTracePlot

TODO

  • secs_to_str to extra module or even thunderlab?
  • Have a class for a single channel that we could add to the toolbar.
  • Only use Data class

Functions

def secs_to_str(time, msec_level=10)
Expand source code
def secs_to_str(time, msec_level=10):
    days = time//(24*3600)
    time -= (24*3600)*days
    hours = time//3600
    time -= 3600*hours
    mins = time//60
    time -= 60*mins
    secs = int(np.floor(time))
    time -= secs
    msecs = f'{1000*time:03.0f}ms'
    if days > 0:
        if msec_level < 4:
            msecs = ''
        return f'{days:.0f}d{hours:.0f}h{mins:.0f}m{secs:.0f}s{msecs}'
    elif hours > 0:
        if msec_level < 3:
            msecs = ''
        return f'{hours:.0f}h{mins:.0f}m{secs:.0f}s{msecs}'
    elif mins > 0:
        if msec_level < 2:
            msecs = ''
        return f'{mins:.0f}m{secs:.0f}s{msecs}'
    elif secs > 0:
        if msec_level < 1:
            msecs = ''
        return f'{secs:.0f}s{msecs}'
    elif time >= 0.01:
        return msecs
    elif time >= 0.001:
        return f'{1000*time:.2f}ms'
    else:
        return f'{1e6*time:.0f}\u00b5s'
def down_sample(proc_idx,
num_proc,
nblock,
step,
array,
file_paths,
tbuffer,
rate,
channels,
unit,
amax,
end_indices,
unwrap_thresh,
unwrap_clips,
load_kwargs)
Expand source code
def down_sample(proc_idx, num_proc, nblock, step, array,
                file_paths, tbuffer, rate, channels, unit, amax, end_indices,
                unwrap_thresh, unwrap_clips, load_kwargs):
    """ Worker for prepare() """
    if end_indices is None:
        data = DataLoader(file_paths, tbuffer, 0,
                          verbose=0, **load_kwargs)
    else:
        data = DataLoader(file_paths, tbuffer, 0,
                          verbose=0, rate=rate, channels=channels,
                          unit=unit, amax=amax, end_indices=end_indices,
                          **load_kwargs)
    data.set_unwrap(unwrap_thresh, unwrap_clips, False, data.unit)
    datas = np.frombuffer(array.get_obj()).reshape((-1, data.channels))
    buffer = np.zeros((nblock, data.channels))
    segments = np.arange(0, len(buffer), step)
    for index in range(proc_idx*nblock, data.frames, num_proc*nblock):
        if data.frames - index < nblock:
            nblock = data.frames - index
            buffer = buffer[:nblock, :]
            segments = np.arange(0, len(buffer), step)
        data.load_buffer(index, nblock, buffer)
        i = 2*index//step
        with array.get_lock():
            np.minimum.reduceat(buffer, segments,
                                out=datas[i + 0:i + 0 + 2*len(segments):2])
            np.maximum.reduceat(buffer, segments,
                                out=datas[i + 1:i + 1 + 2*len(segments):2])
    return None

Worker for prepare()

Classes

class FullTracePlot (data, axtraces, *args, **kwargs)
Expand source code
class FullTracePlot(pg.GraphicsLayoutWidget):


    fulltraces_file = 'fulltraces.json'
    max_files = 100

    
    def __init__(self, data, axtraces, *args, **kwargs):
        pg.GraphicsLayoutWidget.__init__(self, *args, **kwargs)

        self.data = data
        self.tmax = self.data.data.frames/self.data.rate
        self.axtraces = axtraces
        self.no_signal = False
        self.procs = []

        self.setBackground(None)
        self.ci.layout.setContentsMargins(0, 0, 0, 0)
        self.ci.layout.setVerticalSpacing(-1.7)
        
        # for each channel prepare a plot panel:
        xwidth = self.fontMetrics().averageCharWidth()
        self.axs = []
        self.lines = []
        self.regions = []
        self.labels = []
        for c in range(self.data.channels):
            # setup plot panel:
            axt = pg.PlotItem()
            axt.showAxes(True, False)
            axt.getAxis('left').setWidth(8*xwidth)
            axt.getViewBox().setBackgroundColor(None)
            axt.getViewBox().setDefaultPadding(padding=0)
            axt.hideButtons()
            axt.setMenuEnabled(False)
            axt.setMouseEnabled(False, False)
            axt.enableAutoRange(False, False)
            axt.setLimits(xMin=0, xMax=self.tmax,
                          minXRange=self.tmax, maxXRange=self.tmax)
            axt.setXRange(0, self.tmax)

            # add region marker:
            region = pg.LinearRegionItem(pen=dict(color='#110353', width=2),
                                         brush=(34, 6, 167, 127),
                                         hoverPen=dict(color='#aa77ff', width=2),
                                         hoverBrush=(34, 6, 167, 255),
                                         movable=True,
                                         swapMode='block')
            region.setZValue(50)
            region.setBounds((0, self.tmax))
            region.setRegion((self.axtraces[c].viewRange()[0]))
            region.sigRegionChanged.connect(self.update_time_range)
            self.axtraces[c].sigXRangeChanged.connect(self.update_region)
            axt.addItem(region)
            self.regions.append(region)

            # add time label:
            label = QGraphicsSimpleTextItem(axt.getAxis('left'))
            label.setToolTip('Total duration of the recording')
            label.setText(secs_to_str(self.tmax, 1))
            label.setPos(int(xwidth), 0)
            self.labels.append(label)
            
            # add data:
            line = pg.PlotDataItem(antialias=True,
                                   pen=dict(color='#2206a7', width=1.1),
                                   skipFiniteCheck=True, autDownsample=False)
            line.setZValue(10)
            axt.addItem(line)
            self.lines.append(line)

            # add zero line:
            zero_line = axt.addLine(y=0, movable=False,
                                    pen=dict(color='grey', width=1))
            zero_line.setZValue(20)
            
            self.addItem(axt, row=c, col=0)
            self.axs.append(axt)

        self.shared_array = None
        self.times = None
        self.datas = None
        self.index = 0
        self.load_data()
            

    def __del__(self):
        self.close()

        
    def close(self):
        for proc in self.procs:
            proc.terminate()
            proc.join()
            proc.close()
        self.procs = []

        
    def polish(self):
        text_color = self.palette().color(QPalette.WindowText)
        for label in self.labels:
            label.setBrush(text_color)
        QTimer.singleShot(500, self.plot_data)


    def prepare(self):
        self.procs = []
        if self.times is not None and self.data is not None:
            return
        max_pixel = QApplication.desktop().screenGeometry().width()
        step = max(1, self.data.frames//max_pixel)
        nblock = max(step, int(30.0*self.data.rate//step)*step)
        end_indices = None
        if len(self.data.data.file_paths) > 1:
            end_indices = self.data.data.end_indices
        self.times = np.arange(0, self.data.data.frames + step - 1,
                               step/2)/self.data.rate
        self.shared_array = Array(c.c_double, len(self.times)*self.data.channels)
        self.datas = np.frombuffer(self.shared_array.get_obj())
        self.datas = self.datas.reshape((len(self.times), self.data.channels))
        nprocs = os.cpu_count() - 1
        for i in range(max(1, nprocs)):
            p = Process(target=down_sample,
                        args=(i, nprocs, nblock, step,
                              self.shared_array,
                              self.data.data.file_paths,
                              nblock/self.data.rate + 0.1,
                              self.data.rate, self.data.channels,
                              self.data.data.unit, self.data.data.ampl_max,
                              end_indices,
                              self.data.data.unwrap_thresh,
                              self.data.data.unwrap_clips,
                              self.data.load_kwargs))
            p.start()
            self.procs.append(p)
            

    def plot_data(self):

        def set_plot_ranges():
            for c in range(self.datas.shape[1]):
                ymin = np.min(self.datas[:, c])
                ymax = np.max(self.datas[:, c])
                y = max(abs(ymin), abs(ymax))
                self.axs[c].setYRange(-y, y)
                self.axs[c].setLimits(yMin=-y, yMax=y,
                                      minYRange=2*y, maxYRange=2*y)

        if len(self.procs) == 0:
            for c in range(self.datas.shape[1]):
                self.lines[c].setData(self.times, self.datas[:, c])
            set_plot_ranges()
        else:
            done = True
            for proc in self.procs:
                if proc.is_alive():
                    done = False
                    break
            lock = self.shared_array.get_lock()
            if lock.acquire(block=False):
                for c in range(self.datas.shape[1]):
                    self.lines[c].setData(self.times, self.datas[:, c])
                lock.release()
            else:
                done = False
            if done:
                for proc in self.procs:
                    proc.close()
                self.procs = []
                set_plot_ranges()
                self.save_data()
            else:
                QTimer.singleShot(500, self.plot_data)


    def save_data(self):
        audian_dirs.user_cache_path.mkdir(parents=True, exist_ok=True)
        files = {}
        ft_path = audian_dirs.user_cache_path / FullTracePlot.fulltraces_file
        if ft_path.exists():
            with ft_path.open() as sf:
                files = json.load(sf)
        # new filename:
        ft_name = f'{1:08X}-fulltrace.wav'
        for k in range(1, 1000):
            ft_name = f'{k:08X}-fulltrace.wav'
            if not ft_name in files.keys():
                break
        # add to dictionary:
        first_file = Path(self.data.data.file_paths[0]).absolute()
        last_file = Path(self.data.data.file_paths[-1]).absolute()
        timestamp = datetime.now().isoformat()
        rate = 1/(self.times[1] - self.times[0])
        ft_props = dict(first=str(first_file),
                        last=str(last_file),
                        rate=rate,
                        created=timestamp,
                        used=timestamp)
        files[ft_name] = ft_props
        # remove old files:
        if len(files) > FullTracePlot.max_files:
            ft_files = list(files)
            timestamps = [files[ftf]['used'] for ftf in ft_files]
            idx = np.argsort(timestamps)
            for i in idx[:len(ft_files) - FullTracePlot.max_files]:
                try:
                    (audian_dirs.user_cache_path / ft_files[i]).unlink()
                except Exception as e:
                    print(e)
                files.pop(ft_files[i])
        # save json file:
        with ft_path.open('w') as df:
            json.dump(files, df, indent=4)
        # save file:
        write_audio(str(audian_dirs.user_cache_path / ft_name),
                    self.datas, 1e6*rate, format='WAV', encoding='DOUBLE')
        

    def load_data(self):
        ft_path = audian_dirs.user_cache_path / FullTracePlot.fulltraces_file
        if audian_dirs.user_cache_path.exists() and ft_path.exists():
            # load json file:
            files = {}
            with ft_path.open() as sf:
                files = json.load(sf)
            # search for entry with matching source files:
            first_file = Path(self.data.data.file_paths[0]).absolute()
            last_file = Path(self.data.data.file_paths[-1]).absolute()
            for ft_file in files.keys():
                ft_props = files[ft_file]
                if ft_props['first'] == str(first_file) and \
                   ft_props['last'] == str(last_file):
                    # load full trace data:
                    self.datas, rate = load_audio(str(audian_dirs.user_cache_path / ft_file))
                    rate = ft_props['rate']
                    self.times = np.arange(len(self.datas))/rate
                    # update timestamp:
                    timestamp = datetime.now().isoformat()
                    ft_props['used'] = timestamp
                    # save json file:
                    with ft_path.open('w') as df:
                        json.dump(files, df)
                    break

                    
    def update_layout(self, channels, data_height):
        first = True
        for c in range(self.data.channels):
            self.axs[c].setVisible(c in channels)
            if c in channels:
                self.ci.layout.setRowFixedHeight(c, data_height)
                self.labels[c].setVisible(first)
                first = False
            else:
                self.ci.layout.setRowFixedHeight(c, 0)
                self.labels[c].setVisible(False)
        self.setFixedHeight(len(channels)*data_height)


    def update_time_range(self, region):
        if self.no_signal:
            return
        self.no_signal = True
        xmin, xmax = region.getRegion()
        for ax, reg in zip(self.axtraces, self.regions):
            if reg is region:
                ax.setXRange(xmin, xmax)
                break
        self.no_signal = False


    def update_region(self, vbox, x_range):
        for ax, region in zip(self.axtraces, self.regions):
            if ax.getViewBox() is vbox:
                region.setRegion(x_range)
                break

        
    def mousePressEvent(self, ev):
        if ev.button() == Qt.MouseButton.LeftButton:
            for ax, region in zip(self.axs, self.regions):
                pos = ax.getViewBox().mapSceneToView(ev.pos())
                [xmin, xmax], [ymin, ymax] = ax.viewRange()
                if xmin <= pos.x() <= xmax and ymin <= pos.y() <= ymax:
                    dx = (xmax - xmin)/self.width()
                    x = pos.x()
                    xmin, xmax = region.getRegion()
                    if x < xmin-2*dx or x > xmax + 2*dx:
                        dx = xmax - xmin
                        xmin = max(0, x - dx/2)
                        xmax = xmin + dx
                        if xmax > self.tmax:
                            xmin = max(0, xmax - dx)
                        region.setRegion((xmin, xmax))
                        ev.accept()
                        return
                    break
        ev.ignore()
        super().mousePressEvent(ev)

Convenience class consisting of a :class:GraphicsView <pyqtgraph.GraphicsView> with a single :class:GraphicsLayout <pyqtgraph.GraphicsLayout> as its central item.

This widget is an easy starting point for generating multi-panel figures. Example::

w = pg.GraphicsLayoutWidget()
p1 = w.addPlot(row=0, col=0)
p2 = w.addPlot(row=0, col=1)
v = w.addViewBox(row=1, col=0, colspan=2)

========= ================================================================= parent (QWidget or None) The parent widget. show (bool) If True, then immediately show the widget after it is created. If the widget has no parent, then it will be shown inside a new window. size (width, height) tuple. Optionally resize the widget. Note: if this widget is placed inside a layout, then this argument has no effect. title (str or None) If specified, then set the window title for this widget. kargs All extra arguments are passed to :meth:GraphicsLayout.__init__ <pyqtgraph.GraphicsLayout.__init__> ========= =================================================================

This class wraps several methods from its internal GraphicsLayout: :func:nextRow <pyqtgraph.GraphicsLayout.nextRow> :func:nextColumn <pyqtgraph.GraphicsLayout.nextColumn> :func:addPlot <pyqtgraph.GraphicsLayout.addPlot> :func:addViewBox <pyqtgraph.GraphicsLayout.addViewBox> :func:addItem <pyqtgraph.GraphicsLayout.addItem> :func:getItem <pyqtgraph.GraphicsLayout.getItem> :func:addLabel <pyqtgraph.GraphicsLayout.addLabel> :func:addLayout <pyqtgraph.GraphicsLayout.addLayout> :func:removeItem <pyqtgraph.GraphicsLayout.removeItem> :func:itemIndex <pyqtgraph.GraphicsLayout.itemIndex> :func:clear <pyqtgraph.GraphicsLayout.clear>

============== ============================================================ Arguments: parent Optional parent widget useOpenGL If True, the GraphicsView will use OpenGL to do all of its rendering. This can improve performance on some systems, but may also introduce bugs (the combination of QGraphicsView and QOpenGLWidget is still an 'experimental' feature of Qt) background Set the background color of the GraphicsView. Accepts any single argument accepted by :func:mkColor <pyqtgraph.mkColor>. By default, the background color is determined using the 'backgroundColor' configuration option (see :func:setConfigOptions <pyqtgraph.setConfigOptions>). ============== ============================================================

Ancestors

  • pyqtgraph.widgets.GraphicsLayoutWidget.GraphicsLayoutWidget
  • pyqtgraph.widgets.GraphicsView.GraphicsView
  • PyQt5.QtWidgets.QGraphicsView
  • PyQt5.QtWidgets.QAbstractScrollArea
  • PyQt5.QtWidgets.QFrame
  • PyQt5.QtWidgets.QWidget
  • PyQt5.QtCore.QObject
  • sip.wrapper
  • PyQt5.QtGui.QPaintDevice
  • sip.simplewrapper

Class variables

var fulltraces_file
var max_files

Methods

def close(self)
Expand source code
def close(self):
    for proc in self.procs:
        proc.terminate()
        proc.join()
        proc.close()
    self.procs = []

close(self) -> bool

def polish(self)
Expand source code
def polish(self):
    text_color = self.palette().color(QPalette.WindowText)
    for label in self.labels:
        label.setBrush(text_color)
    QTimer.singleShot(500, self.plot_data)
def prepare(self)
Expand source code
def prepare(self):
    self.procs = []
    if self.times is not None and self.data is not None:
        return
    max_pixel = QApplication.desktop().screenGeometry().width()
    step = max(1, self.data.frames//max_pixel)
    nblock = max(step, int(30.0*self.data.rate//step)*step)
    end_indices = None
    if len(self.data.data.file_paths) > 1:
        end_indices = self.data.data.end_indices
    self.times = np.arange(0, self.data.data.frames + step - 1,
                           step/2)/self.data.rate
    self.shared_array = Array(c.c_double, len(self.times)*self.data.channels)
    self.datas = np.frombuffer(self.shared_array.get_obj())
    self.datas = self.datas.reshape((len(self.times), self.data.channels))
    nprocs = os.cpu_count() - 1
    for i in range(max(1, nprocs)):
        p = Process(target=down_sample,
                    args=(i, nprocs, nblock, step,
                          self.shared_array,
                          self.data.data.file_paths,
                          nblock/self.data.rate + 0.1,
                          self.data.rate, self.data.channels,
                          self.data.data.unit, self.data.data.ampl_max,
                          end_indices,
                          self.data.data.unwrap_thresh,
                          self.data.data.unwrap_clips,
                          self.data.load_kwargs))
        p.start()
        self.procs.append(p)
def plot_data(self)
Expand source code
def plot_data(self):

    def set_plot_ranges():
        for c in range(self.datas.shape[1]):
            ymin = np.min(self.datas[:, c])
            ymax = np.max(self.datas[:, c])
            y = max(abs(ymin), abs(ymax))
            self.axs[c].setYRange(-y, y)
            self.axs[c].setLimits(yMin=-y, yMax=y,
                                  minYRange=2*y, maxYRange=2*y)

    if len(self.procs) == 0:
        for c in range(self.datas.shape[1]):
            self.lines[c].setData(self.times, self.datas[:, c])
        set_plot_ranges()
    else:
        done = True
        for proc in self.procs:
            if proc.is_alive():
                done = False
                break
        lock = self.shared_array.get_lock()
        if lock.acquire(block=False):
            for c in range(self.datas.shape[1]):
                self.lines[c].setData(self.times, self.datas[:, c])
            lock.release()
        else:
            done = False
        if done:
            for proc in self.procs:
                proc.close()
            self.procs = []
            set_plot_ranges()
            self.save_data()
        else:
            QTimer.singleShot(500, self.plot_data)
def save_data(self)
Expand source code
def save_data(self):
    audian_dirs.user_cache_path.mkdir(parents=True, exist_ok=True)
    files = {}
    ft_path = audian_dirs.user_cache_path / FullTracePlot.fulltraces_file
    if ft_path.exists():
        with ft_path.open() as sf:
            files = json.load(sf)
    # new filename:
    ft_name = f'{1:08X}-fulltrace.wav'
    for k in range(1, 1000):
        ft_name = f'{k:08X}-fulltrace.wav'
        if not ft_name in files.keys():
            break
    # add to dictionary:
    first_file = Path(self.data.data.file_paths[0]).absolute()
    last_file = Path(self.data.data.file_paths[-1]).absolute()
    timestamp = datetime.now().isoformat()
    rate = 1/(self.times[1] - self.times[0])
    ft_props = dict(first=str(first_file),
                    last=str(last_file),
                    rate=rate,
                    created=timestamp,
                    used=timestamp)
    files[ft_name] = ft_props
    # remove old files:
    if len(files) > FullTracePlot.max_files:
        ft_files = list(files)
        timestamps = [files[ftf]['used'] for ftf in ft_files]
        idx = np.argsort(timestamps)
        for i in idx[:len(ft_files) - FullTracePlot.max_files]:
            try:
                (audian_dirs.user_cache_path / ft_files[i]).unlink()
            except Exception as e:
                print(e)
            files.pop(ft_files[i])
    # save json file:
    with ft_path.open('w') as df:
        json.dump(files, df, indent=4)
    # save file:
    write_audio(str(audian_dirs.user_cache_path / ft_name),
                self.datas, 1e6*rate, format='WAV', encoding='DOUBLE')
def load_data(self)
Expand source code
def load_data(self):
    ft_path = audian_dirs.user_cache_path / FullTracePlot.fulltraces_file
    if audian_dirs.user_cache_path.exists() and ft_path.exists():
        # load json file:
        files = {}
        with ft_path.open() as sf:
            files = json.load(sf)
        # search for entry with matching source files:
        first_file = Path(self.data.data.file_paths[0]).absolute()
        last_file = Path(self.data.data.file_paths[-1]).absolute()
        for ft_file in files.keys():
            ft_props = files[ft_file]
            if ft_props['first'] == str(first_file) and \
               ft_props['last'] == str(last_file):
                # load full trace data:
                self.datas, rate = load_audio(str(audian_dirs.user_cache_path / ft_file))
                rate = ft_props['rate']
                self.times = np.arange(len(self.datas))/rate
                # update timestamp:
                timestamp = datetime.now().isoformat()
                ft_props['used'] = timestamp
                # save json file:
                with ft_path.open('w') as df:
                    json.dump(files, df)
                break
def update_layout(self, channels, data_height)
Expand source code
def update_layout(self, channels, data_height):
    first = True
    for c in range(self.data.channels):
        self.axs[c].setVisible(c in channels)
        if c in channels:
            self.ci.layout.setRowFixedHeight(c, data_height)
            self.labels[c].setVisible(first)
            first = False
        else:
            self.ci.layout.setRowFixedHeight(c, 0)
            self.labels[c].setVisible(False)
    self.setFixedHeight(len(channels)*data_height)
def update_time_range(self, region)
Expand source code
def update_time_range(self, region):
    if self.no_signal:
        return
    self.no_signal = True
    xmin, xmax = region.getRegion()
    for ax, reg in zip(self.axtraces, self.regions):
        if reg is region:
            ax.setXRange(xmin, xmax)
            break
    self.no_signal = False
def update_region(self, vbox, x_range)
Expand source code
def update_region(self, vbox, x_range):
    for ax, region in zip(self.axtraces, self.regions):
        if ax.getViewBox() is vbox:
            region.setRegion(x_range)
            break
def mousePressEvent(self, ev)
Expand source code
def mousePressEvent(self, ev):
    if ev.button() == Qt.MouseButton.LeftButton:
        for ax, region in zip(self.axs, self.regions):
            pos = ax.getViewBox().mapSceneToView(ev.pos())
            [xmin, xmax], [ymin, ymax] = ax.viewRange()
            if xmin <= pos.x() <= xmax and ymin <= pos.y() <= ymax:
                dx = (xmax - xmin)/self.width()
                x = pos.x()
                xmin, xmax = region.getRegion()
                if x < xmin-2*dx or x > xmax + 2*dx:
                    dx = xmax - xmin
                    xmin = max(0, x - dx/2)
                    xmax = xmin + dx
                    if xmax > self.tmax:
                        xmin = max(0, xmax - dx)
                    region.setRegion((xmin, xmax))
                    ev.accept()
                    return
                break
    ev.ignore()
    super().mousePressEvent(ev)

mousePressEvent(self, event: Optional[QMouseEvent])