Coverage for src/thunderlab/powerspectrum.py: 94%
142 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-18 22:10 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-18 22:10 +0000
1"""Powerspectra and spectrograms for a given frequency resolution
3## Computation of nfft
5- `next_power_of_two()`: round an integer up to the next power of two.
6- `nfft()`: compute nfft based on a given frequency resolution.
8## Decibel
10- `decibel()`: transform power to decibel.
11- `power()`: transform decibel to power.
13## Power spectra
15- `psd()`: power spectrum for a given frequency resolution.
16- `multi_psd()`: power spectra for consecutive data windows and mutiple frequency resolutions.
17- `spectrogram()`: spectrogram of a given frequency resolution and overlap fraction.
19## Power spectrum analysis
21- `peak_freqs()`: peak frequencies computed from power spectra of data snippets.
23## Visualization
25- `plot_decibel_psd()`: plot power spectrum in decibel.
27## Configuration parameter
29- `add_multi_psd_config()`: add parameters for multi_psd() to configuration.
30- `multi_psd_args()`: retrieve parameters for mulit_psd() from configuration.
31"""
33import numpy as np
34from scipy.signal import get_window
35from matplotlib.mlab import psd as mpsd
36try:
37 from scipy.signal import welch
38 psdscipy = True
39except ImportError:
40 psdscipy = False
41from matplotlib.mlab import specgram as mspecgram
42try:
43 from scipy.signal import spectrogram as sspecgram
44 specgramscipy = True
45except ImportError:
46 specgramscipy = False
47from .eventdetection import detect_peaks
50def next_power_of_two(n):
51 """The next integer power of two.
53 Parameters
54 ----------
55 n: int
56 A positive number.
58 Returns
59 -------
60 m: int
61 The next integer power of two equal or larger than `n`.
62 """
63 return int(2 ** np.floor(np.log(n) / np.log(2.0) + 1.0-1e-8))
66def nfft(rate, freq_resolution, min_nfft=16, max_nfft=None):
67 """Required number of samples for an FFT of a given frequency resolution.
69 Note that the returned number of FFT samples results
70 in frequency intervals that are smaller or equal to `freq_resolution`.
72 Parameters
73 ----------
74 rate: float
75 Sampling rate of the data in Hertz.
76 freq_resolution: float
77 Minimum frequency resolution in Hertz.
78 min_nfft: int
79 Smallest value of nfft to be used.
80 max_nfft: int or None
81 If not None, largest value of nfft to be used.
83 Returns
84 -------
85 nfft: int
86 Number of FFT points.
87 """
88 nfft = next_power_of_two(rate / freq_resolution)
89 if not max_nfft is None:
90 if nfft > max_nfft:
91 nfft = next_power_of_two(max_nfft//2 + 1)
92 if nfft < min_nfft:
93 nfft = min_nfft
94 return nfft
97def decibel(power, ref_power=1.0, min_power=1e-20):
98 """Transform power to decibel relative to ref_power.
100 \\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\]
101 Power values smaller than `min_power` are set to `-np.inf`.
103 Parameters
104 ----------
105 power: float or array
106 Power values, for example from a power spectrum or spectrogram.
107 ref_power: float or None or 'peak'
108 Reference power for computing decibel.
109 If set to `None` or 'peak', the maximum power is used.
110 min_power: float
111 Power values smaller than `min_power` are set to `-np.inf`.
113 Returns
114 -------
115 decibel_psd: array
116 Power values in decibel relative to `ref_power`.
117 """
118 if np.isscalar(power):
119 tmp_power = np.array([power])
120 decibel_psd = np.array([power])
121 else:
122 tmp_power = power
123 decibel_psd = power.copy()
124 if ref_power is None or ref_power == 'peak':
125 ref_power = np.max(decibel_psd)
126 decibel_psd[tmp_power <= min_power] = float('-inf')
127 decibel_psd[tmp_power > min_power] = 10.0 * np.log10(decibel_psd[tmp_power > min_power]/ref_power)
128 if np.isscalar(power):
129 return decibel_psd[0]
130 else:
131 return decibel_psd
134def power(decibel, ref_power=1.0):
135 """Transform decibel back to power relative to `ref_power`.
137 \\[ power = ref\\_power \\cdot 10^{decibel/10} \\]
139 Parameters
140 ----------
141 decibel: array
142 Decibel values of the power spectrum or spectrogram.
143 ref_power: float
144 Reference power for computing power.
146 Returns
147 -------
148 power: array
149 Power values of the power spectrum or spectrogram.
150 """
151 return ref_power * 10.0 ** (0.1 * decibel)
154def psd(data, ratetime, freq_resolution, min_nfft=16, max_nfft=None,
155 overlap_frac=0.5, detrend='constant', window='hann'):
156 """Power spectrum density of a given frequency resolution.
158 NFFT is computed from the requested frequency resolution and the
159 sampling rate. Check the returned frequency array for the actually
160 used frequency resolution. The frequency intervals are smaller or
161 equal to `freq_resolution`. NFFT can be retrieved by dividing
162 the sampling rate by the actual frequency resolution:
163 ```
164 freq, power = psd(data, samplingrate, 0.1)
165 df = np.mean(np.diff(freq)) # the actual frequency resolution
166 nfft = int(samplingrate/df)
167 ```
169 Uses `scipy signal.welch()` if available, otherwise
170 `matplotlib.mlab.psd()`.
172 Parameters
173 ----------
174 data: 1-D array
175 Data from which power spectra are computed.
176 ratetime: float or array
177 If float, sampling rate of the data in Hertz.
178 If array, assume `ratetime` to be the time array
179 corresponding to the data.
180 Compute sampling rate as `1/(ratetime[1]-ratetime[0])`.
181 freq_resolution: float
182 Frequency resolution of the psd in Hertz.
183 min_nfft: int
184 Smallest value of nfft to be used.
185 max_nfft: int or None
186 If not None, largest value of nfft to be used.
187 overlap_frac: float
188 Fraction of overlap for the fft windows.
189 detrend: string
190 If 'constant' or 'mean' subtract mean of data.
191 If 'linear' subtract line fitted to the data.
192 If 'none' do not detrend the data.
193 window: string
194 Function used for windowing data segements.
195 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen,
196 bohman, blackmanharris, nuttall, fattop, barthann
197 (see scipy.signal window functions).
199 Returns
200 -------
201 freq: 1-D array
202 Frequencies corresponding to power array.
203 power: 1-D array
204 Power spectral density in [data]^2/Hz.
205 """
206 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0])
207 n_fft = nfft(rate, freq_resolution, min_nfft, max_nfft)
208 noverlap = int(n_fft * overlap_frac)
209 if psdscipy:
210 if detrend == 'none':
211 detrend = False
212 elif detrend == 'mean':
213 detrend = 'constant'
214 freqs, power = welch(data, fs=rate, nperseg=n_fft, nfft=None,
215 noverlap=noverlap, detrend=detrend,
216 window=window, scaling='density')
217 else:
218 if detrend == 'constant':
219 detrend = 'mean'
220 power, freqs = mpsd(data, Fs=rate, NFFT=n_fft,
221 noverlap=noverlap, detrend=detrend,
222 window=get_window(window, n_fft),
223 scale_by_freq=True)
224 # squeeze is necessary when n_fft is too large with respect to the data:
225 return freqs, np.squeeze(power)
228def multi_psd(data, ratetime, freq_resolution=0.2,
229 num_windows=1,
230 min_nfft=16, overlap_frac=0.5,
231 detrend='constant', window='hann'):
232 """Power spectra computed for consecutive data windows.
234 See also psd() for more information on power spectra with given
235 frequency resolution.
237 Parameters
238 ----------
239 data: 1-D array
240 Data from which power spectra are computed.
241 ratetime: float or array
242 If float, sampling rate of the data in Hertz.
243 If array, assume `ratetime` to be the time array
244 corresponding to the data.
245 Compute sampling rate as `1/(ratetime[1]-ratetime[0])`.
246 freq_resolution: float
247 Frequency resolution of psd in Hertz.
248 num_windows: int
249 Data are chopped into `num_windows` segments that overlap by half
250 for which power spectra are computed.
251 min_nfft: int
252 Smallest value of nfft to be used.
253 overlap_frac: float
254 Fraction of overlap for the fft windows within a single power spectrum.
255 detrend: string
256 If 'constant' subtract mean of data.
257 If 'linear' subtract line fitted to the data.
258 If 'none' do not deternd the data.
259 window: string
260 Function used for windowing data segements.
261 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen,
262 bohman, blackmanharris, nuttall, fattop, barthann
263 (see scipy.signal window functions).
265 Returns
266 -------
267 multi_psd_data: list of 2-D arrays
268 List of the power spectra for each window and frequency resolution
269 (`psd_data[i][freq, power]`).
270 """
271 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0])
272 n_incr = len(data)//(num_windows+1) # overlap by half a window
273 multi_psd_data = []
274 for k in range(num_windows):
275 freq, power = psd(data[k*n_incr:(k+2)*n_incr], rate,
276 freq_resolution, min_nfft, 2*n_incr,
277 overlap_frac, detrend, window)
278 multi_psd_data.append(np.column_stack((freq, power)))
279 return multi_psd_data
282def spectrogram(data, ratetime, freq_resolution=0.2, min_nfft=16,
283 max_nfft=None, overlap_frac=0.5,
284 detrend='constant', window='hann'):
285 """Spectrogram of a given frequency resolution.
287 Check the returned frequency array for the actually used frequency
288 resolution.
289 The actual frequency resolution is smaller or equal to `freq_resolution`.
290 The used number of data points per FFT segment (NFFT) is the
291 sampling rate divided by the actual frequency resolution:
293 ```
294 spec, freq, time = spectrum(data, samplingrate, 0.1) # request 0.1Hz resolution
295 df = np.mean(np.diff(freq)) # the actual frequency resolution
296 nfft = int(samplingrate/df)
297 ```
299 Parameters
300 ----------
301 data: 1D or 2D array of floats
302 Data for the spectrograms. First dimension is time,
303 optional second dimension is channel.
304 ratetime: float or array
305 If float, sampling rate of the data in Hertz.
306 If array, assume `ratetime` to be the time array
307 corresponding to the data.
308 The sampling rate is then computed as `1/(ratetime[1]-ratetime[0])`.
309 freq_resolution: float
310 Frequency resolution for the spectrogram in Hertz. See `nfft()`
311 for details.
312 min_nfft: int
313 Smallest value of nfft to be used. See `nfft()` for details.
314 max_nfft: int or None
315 If not None, largest value of nfft to be used.
316 See `nfft()` for details.
317 overlap_frac: float
318 Overlap of the nffts (0 = no overlap; 1 = complete overlap).
319 detrend: string or False
320 If 'constant' subtract mean of each data segment.
321 If 'linear' subtract line fitted to each data segment.
322 If `False` do not detrend the data segments.
323 window: string
324 Function used for windowing data segements.
325 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen,
326 bohman, blackmanharris, nuttall, fattop, barthann, tukey
327 (see scipy.signal window functions).
329 Returns
330 -------
331 freqs: array
332 Frequencies of the spectrogram.
333 time: array
334 Time of the nfft windows.
335 spectrum: 2D or 3D array
336 Power spectral density for each frequency and time.
337 First dimension is frequency and second dimension is time.
338 Optional last dimension is channel.
339 """
340 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0])
341 n_fft = nfft(rate, freq_resolution, min_nfft, max_nfft)
342 noverlap = int(n_fft * overlap_frac)
343 if specgramscipy:
344 freqs, time, spec = sspecgram(data, fs=rate, window=window,
345 nperseg=n_fft, noverlap=noverlap,
346 detrend=detrend, scaling='density',
347 mode='psd', axis=0)
348 if data.ndim > 1:
349 # scipy spectrogram() returns f x n x t
350 spec = np.transpose(spec, (0, 2, 1))
351 else:
352 if data.ndim > 1:
353 spec = None
354 for k in range(data.shape[1]):
355 try:
356 ssx, freqs, time = mspecgram(data[:,k], NFFT=n_fft, Fs=rate,
357 noverlap=noverlap,
358 detrend=detrend,
359 scale_by_freq=True,
360 scale='linear',
361 mode='psd',
362 window=get_window(window, n_fft))
363 except TypeError:
364 ssx, freqs, time = mspecgram(data[:,k], NFFT=n_fft, Fs=rate,
365 noverlap=noverlap,
366 detrend=detrend,
367 scale_by_freq=True,
368 window=get_window(window, n_fft))
369 if spec is None:
370 spec = np.zeros((len(freqs), len(time), data.shape[1]))
371 spec[:,:,k] = ssx
372 else:
373 try:
374 spec, freqs, time = mspecgram(data, NFFT=n_fft, Fs=rate,
375 noverlap=noverlap,
376 detrend=detrend,
377 scale_by_freq=True, scale='linear',
378 mode='psd',
379 window=get_window(window, n_fft))
380 except TypeError:
381 spec, freqs, time = mspecgram(data, NFFT=n_fft, Fs=rate,
382 noverlap=noverlap,
383 detrend=detrend,
384 scale_by_freq=True,
385 window=get_window(window, n_fft))
386 return freqs, time, spec
389def plot_decibel_psd(ax, freqs, power, ref_power=1.0, min_power=1e-20,
390 log_freq=False, min_freq=0.0, max_freq=2000.0,
391 ymarg=0.0, sstyle=dict(color='tab:blue', lw=1)):
392 """Plot the powerspectum in decibel relative to `ref_power`.
394 Parameters
395 ----------
396 ax:
397 Axis for plot.
398 freqs: 1-D array
399 Frequency array of the power spectrum.
400 power: 1-D array
401 Power values of the power spectrum.
402 ref_power: float
403 Reference power for computing decibel. If set to `None` the maximum power is used.
404 min_power: float
405 Power values smaller than `min_power` are set to `np.nan`.
406 log_freq: boolean
407 Logarithmic (True) or linear (False) frequency axis.
408 min_freq: float
409 Limits of frequency axis are set to `(min_freq, max_freq)`
410 if `max_freq` is greater than zero
411 max_freq: float
412 Limits of frequency axis are set to `(min_freq, max_freq)`
413 and limits of power axis are computed from powers below max_freq
414 if `max_freq` is greater than zero
415 ymarg: float
416 Add this to the maximum decibel power for setting the ylim.
417 sstyle: dict
418 Plot parameter that are passed on to the `plot()` function.
419 """
420 decibel_psd = decibel(power, ref_power=ref_power, min_power=min_power)
421 ax.plot(freqs, decibel_psd, **sstyle)
422 ax.set_xlabel('Frequency [Hz]')
423 if max_freq > 0.0:
424 if log_freq and min_freq < 1e-8:
425 min_freq = 1.0
426 ax.set_xlim(min_freq, max_freq)
427 else:
428 max_freq = freqs[-1]
429 if log_freq:
430 ax.set_xscale('log')
431 dpmf = decibel_psd[freqs < max_freq]
432 pmin = np.min(dpmf[np.isfinite(dpmf)])
433 pmin = np.floor(pmin / 10.0) * 10.0
434 pmax = np.max(dpmf[np.isfinite(dpmf)])
435 pmax = np.ceil((pmax + ymarg) / 10.0) * 10.0
436 ax.set_ylim(pmin, pmax)
437 ax.set_ylabel('Power [dB]')
440def peak_freqs(onsets, offsets, data, rate, freq_resolution=0.2,
441 thresh=None, **kwargs):
442 """Peak frequencies computed from power spectra of data snippets.
444 Parameters
445 ----------
446 onsets: array of ints
447 Indices indicating the onsets of the snippets in `data`.
448 offsets: array of ints
449 Indices indicating the offsets of the snippets in `data`.
450 data: 1-D array
451 Data array that contains the data snippets defined by
452 `onsets` and `offsets`.
453 rate: float
454 Sampling rate of data in Hertz.
455 freq_resolution: float
456 Desired frequency resolution of the computed power spectra in Hertz.
457 thresh: None or float
458 If not None than this is the threshold required for the minimum height
459 of the peak in the decibel power spectrum. If the peak is too small
460 than the peak frequency of that snippet is set to NaN.
461 kwargs: dict
462 Further arguments passed on to psd().
464 Returns
465 -------
466 freqs: array of floats
467 For each data snippet the frequency of the maximum power.
468 """
469 freqs = []
470 for i0, i1 in zip(onsets, offsets):
471 if 'max_nfft' in kwargs:
472 del kwargs['max_nfft']
473 f, power = psd(data[i0:i1], rate, freq_resolution,
474 max_nfft=i1-i0, **kwargs)
475 if thresh is None:
476 fpeak = f[np.argmax(power)]
477 else:
478 p, _ = detect_peaks(decibel(power, None), thresh)
479 if len(p) > 0:
480 ipeak = np.argmax(power[p])
481 fpeak = f[p[ipeak]]
482 else:
483 fpeak = float('NaN')
484 freqs.append(fpeak)
485 return np.array(freqs)
488def add_multi_psd_config(cfg, freq_resolution=0.2,
489 num_resolutions=1, num_windows=1):
490 """Add all parameters needed for the multi_psd() function as
491 a new section to a configuration.
493 Parameters
494 ----------
495 cfg: ConfigFile
496 The configuration.
498 See multi_psd() for details on the remaining arguments.
499 """
500 cfg.add_section('Power spectrum estimation:')
501 cfg.add('frequencyResolution', freq_resolution, 'Hz', 'Frequency resolution of the power spectrum.')
502 cfg.add('numberPSDWindows', num_resolutions, '', 'Number of windows on which power spectra are computed.')
505def multi_psd_args(cfg):
506 """Translates a configuration to the
507 respective parameter names of the multi_psd() function.
509 The return value can then be passed as key-word arguments to
510 this functions.
512 Parameters
513 ----------
514 cfg: ConfigFile
515 The configuration.
517 Returns
518 -------
519 a: dict
520 Dictionary with names of arguments of the multi_psd() function
521 and their values as supplied by `cfg`.
522 """
523 a = cfg.map({'freq_resolution': 'frequencyResolution',
524 'num_windows': 'numberPSDResolutions'})
525 return a
528def main():
529 import matplotlib.pyplot as plt
530 print('Compute powerspectra of two sine waves (300 and 450 Hz)')
532 # generate data:
533 fundamentals = [300, 450] # Hz
534 rate = 100000.0 # Hz
535 time = np.arange(0.0, 8.0, 1.0/rate)
536 data = np.sin(2*np.pi*fundamentals[0]*time) + 0.5*np.sin(2*np.pi*fundamentals[1]*time)
538 # compute power spectra:
539 psd_data = multi_psd(data, rate, freq_resolution=0.5, num_windows=3,
540 detrend='none', window='hann')
542 # plot power spectra:
543 fig, ax = plt.subplots()
544 for k in range(len(psd_data)):
545 df = np.mean(np.diff(psd_data[k][:,0]))
546 nfft = int(rate/df)
547 plot_decibel_psd(ax, psd_data[k][:,0], psd_data[k][:,1],
548 sstyle=dict(lw=2,
549 label=f'$\\Delta f={df:.1f}$ Hz, nnft={nfft}'))
550 ax.legend(loc='upper right')
551 plt.show()
554if __name__ == '__main__':
555 main()