Coverage for src / thunderfish / bestwindow.py: 83%
289 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-09 14:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-09 14:25 +0000
1"""
2Select the best region within a recording with the most stable signal of largest amplitude that is not clipped.
4## Main functions
5- `clip_amplitudes()`: estimated clipping amplitudes from the data.
6- `best_window_indices()`: select start- and end-indices of the best window
7- `best_window_times()`: select start end end-time of the best window
8- `best_window()`: return data of the best window
9- `analysis_window()`: set clipping amplitudes and find analysis window.
11## Configuration
12- `add_clip_config()`: add parameters for `clip_amplitudes()` to configuration.
13- `clip_args()`: retrieve parameters for `clip_amplitudes()` from configuration.
14- `add_best_window_config()`: add parameters for `best_window()` to configuration.
15- `best_window_args()`: retrieve parameters for `best_window*()` from configuration.
17## Visualization
18- `plot_clipping()`: visualization of the algorithm for detecting clipped amplitudes in `clip_amplitudes()`.
19- `plot_best_window()`: visualization of the algorithm used in `best_window_indices()`.
20- `plot_data_window()`: plot the data and the selected analysis window.
21"""
23import numpy as np
24try:
25 import matplotlib.pyplot as plt
26 import matplotlib.ticker as ticker
27except ImportError:
28 pass
30from thunderlab.eventdetection import percentile_threshold, detect_peaks, trim_to_peak
31from audioio import unwrap
34def clip_amplitudes(data, win_indices, min_fac=2.0, nbins=20,
35 min_ampl=None, max_ampl=1.0,
36 min_clip=None, max_clip=0,
37 plot_hist_func=None, **kwargs):
38 """Find the amplitudes where the signal clips.
40 Histograms in data segements of win_indices length are analyzed.
41 If the bins at the edges are more than min_fac times as large as
42 the neighboring bins, clipping at the bin's amplitude is assumed.
44 Parameters
45 ----------
46 data: 1-D array
47 The data.
48 win_indices: int
49 Size of the analysis window in indices.
50 min_fac: float
51 If the first or the second bin is at least `min_fac` times
52 as large as the third bin, their upper bin edge is set as min_clip.
53 Likewise for the last and next-to last bin.
54 nbins: int
55 Number of bins used for computing a histogram within `min_ampl` and `max_ampl`.
56 min_ampl: float or None
57 Minimum to be expected amplitude of the data.
58 If set to None, set it to the negative of max_ampl.
59 max_ampl: float
60 Maximum to be expected amplitude of the data
61 min_clip: float or None
62 Minimum amplitude that is not clipped. If zero, estimate from data.
63 If positive, set to this factor of the minimum amplitude of the data.
64 If set to None, set it to the negative of max_clip.
65 max_clip: float
66 Maximum amplitude that is not clipped. If zero, estimate from data.
67 If negativ, set to this factor of the maximum amplitude of the data.
68 plot_hist_func: function
69 Function for visualizing the histograms, is called for every window.
70 `plot_clipping()` is a simple function that can be passed as `plot_hist_func`
71 to quickly visualize what is going on in `clip_amplitudes()`.
73 Signature:
75 `plot_hist_func(data, winx0, winx1, bins, h, min_clip, max_clip,
76 min_ampl, max_ampl, kwargs)`
78 with the arguments:
80 - `data` (array): the full data array.
81 - `winx0` (int): the start index of the current window.
82 - `winx1` (int): the end index of the current window.
83 - `bins` (array): the bin edges of the histogram.
84 - `h` (array): the histogram, plot it with
85 ```
86 plt.bar(bins[:-1], h, width=np.mean(np.diff(bins)))
87 ```
88 - `min_clip` (float): the current value of the minimum clip amplitude.
89 - `max_clip` (float): the current value of the minimum clip amplitude.
90 - `min_ampl` (float): the minimum amplitude of the data.
91 - `max_ampl` (float): the maximum amplitude of the data.
92 - `kwargs` (dict): further user supplied key-word arguments.
94 Returns
95 -------
96 min_clip: float
97 Minimum amplitude that is not clipped.
98 max_clip: float
99 Maximum amplitude that is not clipped.
100 """
101 if min_ampl is None:
102 min_ampl = -max_ampl
103 if min_clip is None:
104 min_clip = -max_clip
105 if min_clip > 0:
106 min_clip = min_clip*min_ampl
107 if max_clip < 0:
108 max_clip = abs(max_clip)*max_ampl
109 if min_clip > 0 and max_clip > 0:
110 return min_clip, max_clip
111 min_clip = min_ampl
112 max_clip = max_ampl
113 bins = np.linspace(min_ampl, max_ampl, nbins, endpoint=True)
114 win_tinxs = np.arange(0, len(data) - win_indices, win_indices)
115 for wtinx in win_tinxs:
116 h, b = np.histogram(data[wtinx:wtinx + win_indices], bins)
117 if h[0] > min_fac * h[2] and b[0] < 0.4*min_ampl:
118 if h[1] > min_fac * h[2] and b[2] > min_clip:
119 min_clip = b[2]
120 elif b[1] > min_clip:
121 min_clip = b[1]
122 if h[-1] > min_fac * h[-3] and b[-1] > 0.4*max_ampl:
123 if h[-2] > min_fac * h[-3] and b[-3] < max_clip:
124 max_clip = b[-3]
125 elif b[-2] < max_clip:
126 max_clip = b[-2]
127 if plot_hist_func:
128 plot_hist_func(data, wtinx, wtinx + win_indices,
129 b, h, min_clip, max_clip,
130 min_ampl, max_ampl, **kwargs)
131 return min_clip, max_clip
134def plot_clipping(data, winx0, winx1, bins,
135 h, min_clip, max_clip, min_ampl, max_ampl):
136 """Visualize the data histograms and the detected clipping amplitudes.
138 Pass this function as the `plot_hist_func` argument to `clip_amplitudes()`.
139 """
140 fig, (ax1, ax2) = plt.subplots(2, 1)
141 ax1.plot(data[winx0:winx1], 'b')
142 ax1.axhline(min_clip, color='r')
143 ax1.axhline(max_clip, color='r')
144 ax1.set_ylim(-1.0, 1.0)
145 ax2.bar(bins[:-1], h, width=np.mean(np.diff(bins)))
146 ax2.axvline(min_clip, color='r')
147 ax2.axvline(max_clip, color='r')
148 ax2.set_xlim(-1.0, 1.0)
149 plt.show()
152def add_clip_config(cfg, min_clip=0.0, max_clip=0.0,
153 window=1.0, min_fac=2.0, nbins=20,
154 min_ampl=-1.0, max_ampl=1.0):
155 """Add parameter needed for `clip_amplitudes()` as a new section to a configuration.
157 Parameters
158 ----------
159 cfg: ConfigFile
160 The configuration.
161 min_clip: float
162 Default minimum clip amplitude.
163 max_clip: float
164 Default maximum clip amplitude.
166 See `clip_amplitudes()` for details on the remaining arguments.
167 """
168 cfg.add_section('Clipping amplitudes:')
169 cfg.add('minClipAmplitude', min_clip, '', 'Minimum amplitude that is not clipped. If zero estimate from data. If positive, set to this factor of the minimum amplitude of the data.')
170 cfg.add('maxClipAmplitude', max_clip, '', 'Maximum amplitude that is not clipped. If zero estimate from data. If negativ, set to this factor of the maximum amplitude of the data.')
171 cfg.add('clipWindow', window, 's', 'Window size for estimating clip amplitudes.')
172 cfg.add('clipBins', nbins, '', 'Number of bins used for constructing histograms of signal amplitudes.')
173 cfg.add('minClipFactor', min_fac, '',
174 'Edge bins of the histogram of clipped signals have to be larger then their neighbors by this factor.')
177def clip_args(cfg, rate):
178 """Translates a configuration to the respective parameter names of `clip_amplitudes()`.
180 The return value can then be passed as key-word arguments to this
181 function.
183 Parameters
184 ----------
185 cfg: ConfigFile
186 The configuration.
187 rate: float
188 Sampling rate of the data.
190 Returns
191 -------
192 a: dict
193 Dictionary with names of arguments of the `clip_amplitudes()` function
194 and their values as supplied by `cfg`.
195 """
196 a = cfg.map({'min_fac': 'minClipFactor',
197 'nbins': 'clipBins',
198 'min_clip': 'minClipAmplitude',
199 'max_clip': 'maxClipAmplitude'})
200 a['win_indices'] = int(cfg.value('clipWindow')*rate)
201 return a
204def best_window_indices(data, rate, expand=False, win_size=1.,
205 win_shift=0.5, thresh_fac=0.8, percentile=0.1,
206 min_clip=-np.inf, max_clip=np.inf,
207 w_cv_interv=1.0, w_ampl=1.0, w_cv_ampl=1.0,
208 tolerance=0.2, plot_data_func=None, **kwargs):
209 """Find the window within data most suitable for subsequent analysis.
211 First, large peaks and troughs of the data are detected. Peaks and
212 troughs have to be separated in amplitude by at least the value of a
213 dynamic threshold. The threshold is computed in `win_shift` wide
214 windows as `thresh_fac` times the interpercentile range at
215 the `percentile`-th and 100.0-`percentile`-th percentile of the data
216 using the `thunderlab.eventdetection.percentile_threshold()` function.
218 Second, criteria for selecting the best window are computed for each
219 window of width `win_size` shifted by `win_shift` trough the data. The
220 three criteria are:
222 - the mean peak-to-trough amplitude multiplied with the fraction of
223 non clipped peak and trough amplitudes.
224 - the coefficient of variation of the peak-to-trough amplitude.
225 - the coefficient of variation of the inter-peak and inter-trough
226 intervals.
228 Third, a cost function is computed as a weighted sum of the three
229 criteria (the mean amplitude is taken negatively). The respective
230 weights are given by `w_ampl`, `w_cv_ampl`, and `w_cv_interv`.
232 Finally, a threshold is set to the minimum value of the cost
233 function plus tolerance. Then the largest region with the cost
234 function below this threshold is selected as the best window. If
235 `expand` is `False`, then only the single window with smallest cost
236 within the selected largest region is returned.
238 The data used by best window algorithm can be visualized by supplying the
239 function `plot_data_func`. Additional arguments for this function can
240 be supplied via key-word arguments `kwargs`.
242 Parameters
243 ----------
244 data: 1-D array
245 The data to be analyzed.
246 rate: float
247 Sampling rate of the data in Hertz.
248 expand: boolean
249 If `False` return only the single window with the smallest cost.
250 If `True` return the largest window with the cost below the minimum cost
251 plus tolerance.
252 win_size: float
253 Minimum size of the desired best window in seconds.
254 Choose it large enough for the subsequent analysis.
255 win_shift: float
256 Time shift in seconds between windows. Should be smaller or equal to `win_size`.
257 percentile: float
258 `percentile` parameter for the `thunderlab.eventdetection.percentile_threshold()` function
259 used to estimate thresholds for detecting peaks in the data.
260 thresh_fac: float
261 `thresh_fac` parameter for the `thunderlab.eventdetection.percentile_threshold()` function
262 used to estimate thresholds for detecting peaks in the data.
263 min_clip: float
264 Minimum amplitude below which data are clipped.
265 max_clip: float
266 Maximum amplitude above which data are clipped.
267 w_cv_interv: float
268 Weight for the coefficient of variation of the intervals between detected
269 peaks and throughs.
270 w_ampl: float
271 Weight for the mean peak-to-trough amplitude.
272 w_cv_ampl: float
273 Weight for the coefficient of variation of the amplitudes.
274 tolerance: float
275 Added to the minimum cost for expanding the region of the best window.
276 plot_data_func: function
277 Function for plotting the raw data, detected peaks and troughs, the criteria,
278 the cost function and the selected best window.
279 `plot_best_window()` is a simple function that can be passed as the `plot_data_func`
280 parameter to quickly visualize what is going on in selecting the best window.
282 Signature:
284 ````
285 plot_data_func(data, rate, peak_thresh, peak_idx, trough_idx,
286 idx0, idx1, win_start_times, cv_interv, mean_ampl,
287 cv_ampl, clipped_frac, cost_thresh, thresh,
288 valid_wins, **kwargs)
289 ```
291 with the arguments:
293 - `data` (array): raw data.
294 - `rate` (float): sampling rate of the data.
295 - `peak_thresh` (array): thresholds used for detecting peaks and troughs in each data window.
296 - `peak_idx` (array): indices into raw data indicating detected peaks.
297 - `trough_idx` (array): indices into raw data indicating detected troughs.
298 - `idx0` (int): index of the start of the best window.
299 - `idx1` (int): index of the end of the best window.
300 - `win_start_times` (array): times of the analysis windows.
301 - `cv_interv` (array): coefficients of variation of the inter-peak and -trough
302 intervals.
303 - `mean_ampl` (array): mean peak-to-trough amplitudes.
304 - `cv_ampl` (array): coefficients of variation of the peak-to-trough amplitudes.
305 - `clipped_frac` (array): fraction of clipped peaks or troughs.
306 - `cost` (array): cost function.
307 - `cost_thresh` (float): threshold for the cost function.
308 - `valid_wins` (array): boolean array indicating the windows which fulfill
309 all three criteria.
310 - `**kwargs` (dict): further user supplied key-word arguments.
311 kwargs: dict
312 Keyword arguments passed to `plot_data_func`.
314 Returns
315 -------
316 start_index: int
317 Index of the start of the best window.
318 end_index: int
319 Index of the end of the best window.
320 clipped: float.
321 The fraction of clipped peaks or troughs.
323 Raises
324 ------
325 UserWarning
326 - Not enough data for requested `win_size`.
327 - No peaks detected.
328 - No finite amplitudes detected.
329 - No valid interval CV detected.
330 - No valid amplitude CV detected.
331 """
332 # too little data:
333 if len(data)/rate < win_size:
334 raise UserWarning(f'not enough data (data={len(data) / rate:g}s, win={win_size:g}s)')
336 # threshold for peak detection:
337 threshold = percentile_threshold(data, int(win_shift*rate),
338 thresh_fac=thresh_fac,
339 percentile=percentile)
341 # detect large peaks and troughs:
342 peak_idx, trough_idx = detect_peaks(data, threshold)
343 if len(peak_idx) == 0 or len(trough_idx) == 0:
344 raise UserWarning('no peaks or troughs detected')
346 # compute cv of intervals, mean peak amplitude and its cv:
347 invalid_cv = 1000.0
348 win_size_indices = int(win_size*rate)
349 win_start_inxs = np.arange(0, len(data) - win_size_indices,
350 int(0.5*win_shift*rate))
351 if len(win_start_inxs) == 0:
352 win_start_inxs = [0]
353 cv_interv = np.zeros(len(win_start_inxs))
354 mean_ampl = np.zeros(len(win_start_inxs))
355 cv_ampl = np.zeros(len(win_start_inxs))
356 clipped_frac = np.zeros(len(win_start_inxs))
357 for i, wtinx in enumerate(win_start_inxs):
358 # indices of peaks and troughs inside analysis window:
359 pinx = (peak_idx >= wtinx) & (peak_idx <= wtinx + win_size_indices)
360 tinx = (trough_idx >= wtinx) & (trough_idx <= wtinx + win_size_indices)
361 p_idx, t_idx = trim_to_peak(peak_idx[pinx], trough_idx[tinx])
362 # interval statistics:
363 ipis = np.diff(p_idx)
364 itis = np.diff(t_idx)
365 if len(ipis) > 2:
366 cv_interv[i] = 0.5 * (np.std(ipis) / np.mean(ipis) + np.std(itis) / np.mean(itis))
367 # penalize regions without detected peaks:
368 mean_interv = np.mean(ipis)
369 if p_idx[0] - wtinx > mean_interv:
370 cv_interv[i] *= (p_idx[0] - wtinx) / mean_interv
371 if wtinx + win_size_indices - p_idx[-1] > mean_interv:
372 cv_interv[i] *= (wtinx + win_size_indices - p_idx[-1]) / mean_interv
373 else:
374 cv_interv[i] = invalid_cv
375 # statistics of peak-to-trough amplitude:
376 p2t_ampl = data[p_idx] - data[t_idx]
377 if len(p2t_ampl) > 2:
378 mean_ampl[i] = np.mean(p2t_ampl)
379 cv_ampl[i] = np.std(p2t_ampl) / mean_ampl[i]
380 # penalize for clipped peaks:
381 clipped_frac[i] = float(np.sum(data[p_idx] > max_clip) +
382 np.sum(data[t_idx] < min_clip)) / 2.0 / len(p2t_ampl)
383 mean_ampl[i] *= (1.0 - clipped_frac[i]) ** 2.0
384 else:
385 mean_ampl[i] = 0.0
386 cv_ampl[i] = invalid_cv
388 # check:
389 if len(mean_ampl[mean_ampl >= 0.0]) < 0:
390 raise UserWarning('no finite amplitudes detected')
391 if len(cv_interv[cv_interv < invalid_cv]) <= 0:
392 raise UserWarning('no valid interval cv detected')
393 if len(cv_ampl[cv_ampl < invalid_cv]) <= 0:
394 raise UserWarning('no valid amplitude cv detected')
396 # cost function:
397 cost = w_cv_interv * cv_interv + w_cv_ampl * cv_ampl - w_ampl * mean_ampl
398 thresh = np.min(cost) + tolerance
400 # find largest region with low costs:
401 valid_win_idx = np.nonzero(cost <= thresh)[0]
402 cidx0 = valid_win_idx[0] # start of current window
403 cidx1 = cidx0 + 1 # end of current window
404 win_idx0 = cidx0 # start of largest window
405 win_idx1 = cidx1 # end of largest window
406 i = 1
407 while i < len(valid_win_idx): # loop through all valid window positions
408 if valid_win_idx[i] == valid_win_idx[i - 1] + 1:
409 cidx1 = valid_win_idx[i] + 1
410 else:
411 cidx0 = valid_win_idx[i]
412 if cidx1 - cidx0 > win_idx1 - win_idx0: # current window is largest
413 win_idx0 = cidx0
414 win_idx1 = cidx1
415 i += 1
417 # find single best window within the largest region:
418 if not expand:
419 win_idx0 += np.argmin(cost[win_idx0:win_idx1])
420 win_idx1 = win_idx0 + 1
422 # retrive indices of best window for data:
423 idx0 = win_start_inxs[win_idx0]
424 idx1 = win_start_inxs[win_idx1 - 1] + win_size_indices
426 # clipped data?
427 clipped = np.mean(clipped_frac[win_idx0:win_idx1])
429 if plot_data_func:
430 plot_data_func(data, rate, threshold, peak_idx, trough_idx,
431 idx0, idx1, win_start_inxs/rate, cv_interv,
432 mean_ampl, cv_ampl, clipped_frac, cost, thresh,
433 win_idx0, win_idx1, **kwargs)
435 return idx0, idx1, clipped
438def best_window_times(data, rate, expand=False, win_size=1.,
439 win_shift=0.5, thresh_fac=0.8, percentile=0.1,
440 min_clip=-np.inf, max_clip=np.inf,
441 w_cv_interv=1.0, w_ampl=1.0, w_cv_ampl=1.0,
442 tolerance=0.2, plot_data_func=None, **kwargs):
443 """Find the window within data most suitable for subsequent analysis.
445 See `best_window_indices()` for details.
447 Returns
448 -------
449 start_time: float
450 Time of the start of the best window.
451 end_time: float
452 Time of the end of the best window.
453 clipped: float
454 The fraction of clipped peaks or troughs.
455 """
456 start_inx, end_inx, clipped = best_window_indices(data, rate,
457 expand,
458 win_size,
459 win_shift,
460 thresh_fac,
461 percentile,
462 min_clip,
463 max_clip,
464 w_cv_interv,
465 w_ampl,
466 w_cv_ampl,
467 tolerance,
468 plot_data_func,
469 **kwargs)
470 return start_inx/rate, end_inx/rate, clipped
473def best_window(data, rate, expand=False, win_size=1., win_shift=0.5,
474 thresh_fac=0.8, percentile=0.1, min_clip=-np.inf, max_clip=np.inf,
475 w_cv_interv=1.0, w_ampl=1.0, w_cv_ampl=1.0, tolerance=0.2,
476 plot_data_func=None, **kwargs):
477 """Find the window within data most suitable for subsequent analysis.
479 See `best_window_indices()` for details.
481 Returns
482 -------
483 data: array
484 The data of the best window.
485 clipped: float
486 The fraction of clipped peaks or troughs.
487 """
488 start_inx, end_inx, clipped = best_window_indices(data, rate, expand,
489 win_size, win_shift,
490 thresh_fac, percentile,
491 min_clip, max_clip,
492 w_cv_interv, w_ampl, w_cv_ampl,
493 tolerance, plot_data_func, **kwargs)
494 return data[start_inx:end_inx], clipped
497def plot_best_window(data, rate, threshold, peak_idx, trough_idx, idx0, idx1,
498 win_times, cv_interv, mean_ampl, cv_ampl, clipped_frac,
499 cost, thresh, win_idx0, win_idx1, ax):
500 """Visualize the cost function of used for finding the best window for analysis.
502 Pass this function as the `plot_data_func` to the `best_window_*` functions.
504 Parameters
505 ----------
506 See documentation of the `best_window_indices()` functions.
507 """
508 # raw data:
509 time = np.arange(0.0, len(data))/rate
510 ax[0].plot(time, data, 'b', lw=3)
511 if np.mean(clipped_frac[win_idx0:win_idx1]) > 0.01:
512 ax[0].plot(time[idx0:idx1], data[idx0:idx1], color='magenta', lw=3)
513 else:
514 ax[0].plot(time[idx0:idx1], data[idx0:idx1], color='grey', lw=3)
515 ax[0].plot(time[peak_idx], data[peak_idx], 'o', mfc='red', ms=6)
516 ax[0].plot(time[trough_idx], data[trough_idx], 'o', mfc='green', ms=6)
517 ax[0].plot(time, threshold, '#CCCCCC', lw=2)
518 up_lim = np.max(data) * 1.05
519 down_lim = np.min(data) * .95
520 ax[0].set_ylim((down_lim, up_lim))
521 ax[0].set_ylabel('Amplitude [a.u]')
523 # cv of inter-peak intervals:
524 ax[1].plot(win_times[cv_interv < 1000.0], cv_interv[cv_interv < 1000.0], 'o', ms=10, color='grey', mew=2.,
525 mec='black', alpha=0.6)
526 ax[1].plot(win_times[win_idx0:win_idx1], cv_interv[win_idx0:win_idx1], 'o', ms=10, color='red', mew=2., mec='black',
527 alpha=0.6)
528 ax[1].set_ylabel('CV intervals')
529 ax[1].set_ylim(bottom=0.0)
531 # mean amplitude:
532 ax[2].plot(win_times[mean_ampl > 0.0], mean_ampl[mean_ampl > 0.0], 'o', ms=10, color='grey', mew=2., mec='black',
533 alpha=0.6)
534 ax[2].plot(win_times[win_idx0:win_idx1], mean_ampl[win_idx0:win_idx1], 'o', ms=10, color='red', mew=2., mec='black',
535 alpha=0.6)
536 ax[2].set_ylabel('Mean amplitude [a.u]')
537 ax[2].set_ylim(bottom=0.0)
539 # cv:
540 ax[3].plot(win_times[cv_ampl < 1000.0], cv_ampl[cv_ampl < 1000.0], 'o', ms=10, color='grey', mew=2., mec='black',
541 alpha=0.6)
542 ax[3].plot(win_times[win_idx0:win_idx1], cv_ampl[win_idx0:win_idx1], 'o', ms=10, color='red', mew=2., mec='black',
543 alpha=0.6)
544 ax[3].set_ylabel('CV amplitude')
545 ax[3].set_ylim(bottom=0.0)
547 # cost:
548 ax[4].plot(win_times[cost < thresh + 10], cost[cost < thresh + 10], 'o', ms=10, color='grey', mew=2., mec='black',
549 alpha=0.6)
550 ax[4].plot(win_times[win_idx0:win_idx1], cost[win_idx0:win_idx1], 'o', ms=10, color='red', mew=2., mec='black',
551 alpha=0.6)
552 ax[4].axhline(thresh, color='k')
553 ax[4].set_ylabel('Cost')
554 ax[4].set_xlabel('Time [sec]')
557def plot_data_window(ax, data, rate, unit, idx0, idx1, clipped,
558 dstyle=dict(color='tab:blue', lw=1),
559 wstyle=dict(color='tab:red', lw=1)):
560 """Plot the data and mark the analysis window.
562 Parameters
563 ----------
564 ax: matplotlib axes
565 Axes used for plotting.
566 data: 1-D array
567 The full data trace.
568 rate: float
569 Sampling rate of the data in Hertz.
570 unit: string
571 The unit of the data.
572 idx0: int
573 Start index of the best window.
574 idx1: int
575 Stop index of the best window.
576 clipped: float
577 Fraction of clipped peaks.
578 data_color:
579 Color used for plotting the data trace.
580 window_color:
581 Color used for plotting the selected best window.
582 """
583 time = np.arange(len(data))/rate
584 ax.plot(time[:idx0], data[:idx0], **dstyle)
585 ax.plot(time[idx1:], data[idx1:], **dstyle)
586 if idx1 > idx0:
587 ax.plot(time[idx0:idx1], data[idx0:idx1], **wstyle)
588 label = 'analysis\nwindow'
589 if clipped > 0.0:
590 label += f'\n{100.0*clipped:.0f}% clipped'
591 ax.text(time[(idx0+idx1)//2], 0.0, label, ha='center', va='center')
592 ax.set_xlim(time[0], time[-1])
593 ax.set_xlabel('Time [sec]')
594 if len(unit) == 0 or unit == 'a.u.':
595 ax.set_ylabel('Amplitude')
596 else:
597 ax.set_ylabel(f'Amplitude [{unit}]')
598 ax.yaxis.set_major_locator(ticker.MaxNLocator(3))
601def add_best_window_config(cfg, win_pos='best', win_size=1., win_shift=0.5,
602 thresh_fac=0.8, percentile=0.1,
603 min_clip=-np.inf, max_clip=np.inf,
604 w_cv_interv=1.0, w_ampl=1.0, w_cv_ampl=1.0,
605 tolerance=0.2, expand=False):
606 """ Add parameter needed for the `best_window()` functions as a new section to a configuration.
608 Parameters
609 ----------
610 cfg: ConfigFile
611 The configuration.
613 See `best_window_indices()` for details on the remaining arguments.
614 """
615 cfg.add_section('Analysis window:')
616 cfg.add('windowPosition', win_pos, '', 'Position of the analysis window: "beginning", "center", "end", "best", or a time in seconds.')
617 cfg.add('windowSize', win_size, 's', 'Size of the analysis window. This should be much larger than the expected period of the signal. If 0 select the whole time series.')
618 cfg.add('bestWindowShift', win_shift, 's',
619 'For finding the best analysis window, increment for shifting a windows trough the data. Should be larger than the expected period of the signal.')
620 cfg.add('bestWindowThresholdPercentile', percentile, '%',
621 'Percentile for estimating interpercentile range. Should be smaller than the duty cycle of the periodic signal.')
622 cfg.add('bestWindowThresholdFactor', thresh_fac, '',
623 'Threshold for detecting peaks is interperecntile range of the data times this factor.')
624 cfg.add('weightCVInterval', w_cv_interv, '',
625 'Weight factor for the coefficient of variation of the inter-peak and inter-trough intervals.')
626 cfg.add('weightAmplitude', w_ampl, '',
627 'Weight factor for the mean peak-to-trough amplitudes.')
628 cfg.add('weightCVAmplitude', w_cv_ampl, '',
629 'Weight factor for the coefficient of variation of the peak-to-trough amplitude.')
630 cfg.add('bestWindowTolerance', tolerance, '',
631 'Add this to the minimum value of the cost function to get a threshold for selecting the largest best window.')
632 cfg.add('expandBestWindow', expand, '',
633 'Return the largest valid best window. If False return the sole best window. ')
636def best_window_args(cfg):
637 """Translates a configuration to the respective parameter names of the functions `best_window*()`.
639 The return value can then be passed as key-word arguments to these
640 functions.
642 Parameters
643 ----------
644 cfg: ConfigFile
645 The configuration.
647 Returns
648 -------
649 a: dict
650 Dictionary with names of arguments of the `best_window*()` functions
651 and their values as supplied by `cfg`.
652 """
653 return cfg.map({'win_size': 'windowSize',
654 'win_shift': 'bestWindowShift',
655 'percentile': 'bestWindowThresholdPercentile',
656 'thresh_fac': 'bestWindowThresholdFactor',
657 'w_cv_interv': 'weightCVInterval',
658 'w_ampl': 'weightAmplitude',
659 'w_cv_ampl': 'weightCVAmplitude',
660 'tolerance': 'bestWindowTolerance',
661 'expand': 'expandBestWindow'})
664def analysis_window(data, rate, ampl_max, win_pos,
665 cfg, show_bestwindow=False):
666 """Set clipping amplitudes and find analysis window.
668 Parameters
669 ----------
670 data: 1-D array
671 The data to be analyzed.
672 rate: float
673 Sampling rate of the data in Hertz.
674 ampl_max: float
675 Maximum value of input range.
676 win_pos: string or float
677 Position of the analysis window: "beginning", "center", "end" or "best".
678 Alternatively the beginning of the analysis window in seconds.
679 cfg: ConfigFile
680 Configuration for clipping and best window.
681 show_bestwindow: boolean
682 If true show a plot with the best window cost functions.
684 Returns
685 -------
686 data: 1-D array
687 The data array of the best window
688 idx0: int
689 The start index of the best window in the original data.
690 idx1: int
691 The end index of the best window in the original data.
692 clipped: float
693 Fraction of clipped amplitudes.
694 min_clip: float
695 Minimum amplitude that is not clipped.
696 max_clip: float
697 Maximum amplitude that is not clipped.
698 """
699 found_bestwindow = True
700 clipped = 0
701 min_clip, max_clip = clip_amplitudes(data, max_ampl=ampl_max,
702 **clip_args(cfg, rate))
703 if cfg.value('unwrapData'):
704 unwrap(data, 1.5, ampl_max)
705 min_clip *= 2
706 max_clip *= 2
707 # window size parameter:
708 bwa = best_window_args(cfg)
709 if 'win_size' in bwa:
710 del bwa['win_size']
711 window_size = cfg.value('windowSize')
712 if window_size <= 0.0:
713 window_size = (len(data)-1)/rate
714 # show cost function:
715 if win_pos == 'best' and show_bestwindow:
716 fig, ax = plt.subplots(5, sharex=True, figsize=(14., 10.))
717 try:
718 idx0, idx1, clipped = \
719 best_window_indices(data, rate,
720 min_clip=min_clip, max_clip=max_clip,
721 win_size=window_size,
722 plot_data_func=plot_best_window,
723 ax=ax, **bwa)
724 plt.show()
725 except UserWarning as e:
726 found_bestwindow = False
727 else:
728 # too little data:
729 n_win = int(window_size*rate)
730 if len(data) < n_win:
731 return data, 0, 0, False, min_clip, max_clip
732 if win_pos == 'best':
733 try:
734 idx0, idx1, clipped = best_window_indices(data, rate,
735 min_clip=min_clip,
736 max_clip=max_clip,
737 win_size=window_size,
738 **bwa)
739 except UserWarning as e:
740 found_bestwindow = False
741 else:
742 if win_pos[:5] == 'begin':
743 idx0 = 0
744 elif win_pos == 'end':
745 idx0 = len(data) - n_win
746 elif win_pos == 'center':
747 idx0 = (len(data) - n_win)//2
748 else:
749 try:
750 if win_pos[-1] == 's':
751 win_pos = win_pos[:-1]
752 t0 = float(win_pos)
753 except ValueError:
754 found_bestwindow = False
755 t0 = 0.0
756 idx0 = int(t0*rate)
757 idx1 = idx0 + n_win
758 if not found_bestwindow or idx1 > len(data):
759 return data, 0, 0, False, min_clip, max_clip
760 data_seg = data[idx0:idx1]
761 # check for clipping:
762 win_shift = cfg.value('bestWindowShift')
763 thresh_fac = cfg.value('bestWindowThresholdFactor')
764 percentile = cfg.value('bestWindowThresholdPercentile')
765 threshold = percentile_threshold(data_seg,
766 int(win_shift*rate),
767 thresh_fac=thresh_fac,
768 percentile=percentile)
769 peak_idx, trough_idx = detect_peaks(data_seg, threshold)
770 p_idx, t_idx = trim_to_peak(peak_idx, trough_idx)
771 if len(p_idx) > 0:
772 p2t_ampl = data_seg[p_idx] - data_seg[t_idx]
773 clipped = float(np.sum(data_seg[p_idx] > max_clip) +
774 np.sum(data_seg[t_idx] < min_clip))/2/len(p2t_ampl)
775 if found_bestwindow:
776 return data[idx0:idx1], idx0, idx1, clipped, min_clip, max_clip
777 else:
778 return data, 0, 0, False, min_clip, max_clip
781def main(data_file=None):
782 title = 'bestwindow'
783 if data_file is None:
784 # generate data:
785 print('generate waveform...')
786 rate = 100000.0
787 time = np.arange(0.0, 1.0, 1.0 / rate)
788 f = 600.0
789 snippets = []
790 amf = 20.0
791 for ampl in [0.2, 0.5, 0.8]:
792 for am_ampl in [0.0, 0.3, 0.9]:
793 data = ampl * np.sin(2.0 * np.pi * f * time) * (1.0 + am_ampl * np.sin(2.0 * np.pi * amf * time))
794 data[data > 1.3] = 1.3
795 data[data < -1.3] = -1.3
796 snippets.extend(data)
797 data = np.asarray(snippets)
798 title = 'test sines'
799 data += 0.01 * np.random.randn(len(data))
800 else:
801 from thunderlab.dataloader import load_data
802 print(f'load {data_file} ...')
803 data, rate, unit, amax = load_data(data_file)
804 data = data[:,0]
805 title = data_file
807 # determine clipping amplitudes:
808 clip_win_size = 0.5
809 min_clip_fac = 2.0
810 min_clip, max_clip = clip_amplitudes(data, int(clip_win_size * rate),
811 min_fac=min_clip_fac)
812 # min_clip, max_clip = clip_amplitudes(data, int(clip_win_size*rate),
813 # min_fac=min_clip_fac,
814 # plot_hist_func=plot_clipping)
816 # setup plots:
817 fig, ax = plt.subplots(5, 1, sharex=True, figsize=(20, 12))
818 fig.canvas.manager.set_window_title(title)
820 # compute best window:
821 best_window_indices(data, rate, expand=False, win_size=4.0,
822 win_shift=0.5, thresh_fac=0.8, percentile=0.1,
823 min_clip=min_clip, max_clip=max_clip,
824 w_cv_ampl=10.0, tolerance=0.5,
825 plot_data_func=plot_best_window, ax=ax)
827 plt.tight_layout()
828 plt.show()
831if __name__ == '__main__':
832 import sys
833 data_file = sys.argv[1] if len(sys.argv) > 1 else None
834 main(data_file)