Coverage for src/thunderlab/powerspectrum.py: 94%

142 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-18 22:10 +0000

1"""Powerspectra and spectrograms for a given frequency resolution 

2 

3## Computation of nfft 

4 

5- `next_power_of_two()`: round an integer up to the next power of two. 

6- `nfft()`: compute nfft based on a given frequency resolution. 

7 

8## Decibel 

9 

10- `decibel()`: transform power to decibel. 

11- `power()`: transform decibel to power. 

12 

13## Power spectra  

14 

15- `psd()`: power spectrum for a given frequency resolution. 

16- `multi_psd()`: power spectra for consecutive data windows and mutiple frequency resolutions. 

17- `spectrogram()`: spectrogram of a given frequency resolution and overlap fraction. 

18 

19## Power spectrum analysis 

20 

21- `peak_freqs()`: peak frequencies computed from power spectra of data snippets. 

22 

23## Visualization 

24 

25- `plot_decibel_psd()`: plot power spectrum in decibel. 

26 

27## Configuration parameter 

28 

29- `add_multi_psd_config()`: add parameters for multi_psd() to configuration. 

30- `multi_psd_args()`: retrieve parameters for mulit_psd() from configuration. 

31""" 

32 

33import numpy as np 

34from scipy.signal import get_window 

35from matplotlib.mlab import psd as mpsd 

36try: 

37 from scipy.signal import welch 

38 psdscipy = True 

39except ImportError: 

40 psdscipy = False 

41from matplotlib.mlab import specgram as mspecgram 

42try: 

43 from scipy.signal import spectrogram as sspecgram 

44 specgramscipy = True 

45except ImportError: 

46 specgramscipy = False 

47from .eventdetection import detect_peaks 

48 

49 

50def next_power_of_two(n): 

51 """The next integer power of two. 

52  

53 Parameters 

54 ---------- 

55 n: int 

56 A positive number. 

57 

58 Returns 

59 ------- 

60 m: int 

61 The next integer power of two equal or larger than `n`. 

62 """ 

63 return int(2 ** np.floor(np.log(n) / np.log(2.0) + 1.0-1e-8)) 

64 

65 

66def nfft(rate, freq_resolution, min_nfft=16, max_nfft=None): 

67 """Required number of samples for an FFT of a given frequency resolution. 

68 

69 Note that the returned number of FFT samples results 

70 in frequency intervals that are smaller or equal to `freq_resolution`. 

71 

72 Parameters 

73 ---------- 

74 rate: float 

75 Sampling rate of the data in Hertz. 

76 freq_resolution: float 

77 Minimum frequency resolution in Hertz. 

78 min_nfft: int 

79 Smallest value of nfft to be used. 

80 max_nfft: int or None 

81 If not None, largest value of nfft to be used. 

82 

83 Returns 

84 ------- 

85 nfft: int 

86 Number of FFT points. 

87 """ 

88 nfft = next_power_of_two(rate / freq_resolution) 

89 if not max_nfft is None: 

90 if nfft > max_nfft: 

91 nfft = next_power_of_two(max_nfft//2 + 1) 

92 if nfft < min_nfft: 

93 nfft = min_nfft 

94 return nfft 

95 

96 

97def decibel(power, ref_power=1.0, min_power=1e-20): 

98 """Transform power to decibel relative to ref_power. 

99 

100 \\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\] 

101 Power values smaller than `min_power` are set to `-np.inf`. 

102 

103 Parameters 

104 ---------- 

105 power: float or array 

106 Power values, for example from a power spectrum or spectrogram. 

107 ref_power: float or None or 'peak' 

108 Reference power for computing decibel. 

109 If set to `None` or 'peak', the maximum power is used. 

110 min_power: float 

111 Power values smaller than `min_power` are set to `-np.inf`. 

112 

113 Returns 

114 ------- 

115 decibel_psd: array 

116 Power values in decibel relative to `ref_power`. 

117 """ 

118 if np.isscalar(power): 

119 tmp_power = np.array([power]) 

120 decibel_psd = np.array([power]) 

121 else: 

122 tmp_power = power 

123 decibel_psd = power.copy() 

124 if ref_power is None or ref_power == 'peak': 

125 ref_power = np.max(decibel_psd) 

126 decibel_psd[tmp_power <= min_power] = float('-inf') 

127 decibel_psd[tmp_power > min_power] = 10.0 * np.log10(decibel_psd[tmp_power > min_power]/ref_power) 

128 if np.isscalar(power): 

129 return decibel_psd[0] 

130 else: 

131 return decibel_psd 

132 

133 

134def power(decibel, ref_power=1.0): 

135 """Transform decibel back to power relative to `ref_power`. 

136 

137 \\[ power = ref\\_power \\cdot 10^{decibel/10} \\] 

138  

139 Parameters 

140 ---------- 

141 decibel: array 

142 Decibel values of the power spectrum or spectrogram. 

143 ref_power: float 

144 Reference power for computing power. 

145 

146 Returns 

147 ------- 

148 power: array 

149 Power values of the power spectrum or spectrogram. 

150 """ 

151 return ref_power * 10.0 ** (0.1 * decibel) 

152 

153 

154def psd(data, ratetime, freq_resolution, min_nfft=16, max_nfft=None, 

155 overlap_frac=0.5, detrend='constant', window='hann'): 

156 """Power spectrum density of a given frequency resolution. 

157 

158 NFFT is computed from the requested frequency resolution and the 

159 sampling rate. Check the returned frequency array for the actually 

160 used frequency resolution. The frequency intervals are smaller or 

161 equal to `freq_resolution`. NFFT can be retrieved by dividing 

162 the sampling rate by the actual frequency resolution: 

163 ``` 

164 freq, power = psd(data, samplingrate, 0.1) 

165 df = np.mean(np.diff(freq)) # the actual frequency resolution 

166 nfft = int(samplingrate/df) 

167 ``` 

168 

169 Uses `scipy signal.welch()` if available, otherwise 

170 `matplotlib.mlab.psd()`. 

171 

172 Parameters 

173 ---------- 

174 data: 1-D array  

175 Data from which power spectra are computed. 

176 ratetime: float or array 

177 If float, sampling rate of the data in Hertz. 

178 If array, assume `ratetime` to be the time array 

179 corresponding to the data. 

180 Compute sampling rate as `1/(ratetime[1]-ratetime[0])`. 

181 freq_resolution: float 

182 Frequency resolution of the psd in Hertz. 

183 min_nfft: int 

184 Smallest value of nfft to be used. 

185 max_nfft: int or None 

186 If not None, largest value of nfft to be used. 

187 overlap_frac: float 

188 Fraction of overlap for the fft windows. 

189 detrend: string 

190 If 'constant' or 'mean' subtract mean of data. 

191 If 'linear' subtract line fitted to the data. 

192 If 'none' do not detrend the data. 

193 window: string 

194 Function used for windowing data segements. 

195 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen, 

196 bohman, blackmanharris, nuttall, fattop, barthann 

197 (see scipy.signal window functions). 

198 

199 Returns 

200 ------- 

201 freq: 1-D array 

202 Frequencies corresponding to power array. 

203 power: 1-D array 

204 Power spectral density in [data]^2/Hz. 

205 """ 

206 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0]) 

207 n_fft = nfft(rate, freq_resolution, min_nfft, max_nfft) 

208 noverlap = int(n_fft * overlap_frac) 

209 if psdscipy: 

210 if detrend == 'none': 

211 detrend = False 

212 elif detrend == 'mean': 

213 detrend = 'constant' 

214 freqs, power = welch(data, fs=rate, nperseg=n_fft, nfft=None, 

215 noverlap=noverlap, detrend=detrend, 

216 window=window, scaling='density') 

217 else: 

218 if detrend == 'constant': 

219 detrend = 'mean' 

220 power, freqs = mpsd(data, Fs=rate, NFFT=n_fft, 

221 noverlap=noverlap, detrend=detrend, 

222 window=get_window(window, n_fft), 

223 scale_by_freq=True) 

224 # squeeze is necessary when n_fft is too large with respect to the data: 

225 return freqs, np.squeeze(power) 

226 

227 

228def multi_psd(data, ratetime, freq_resolution=0.2, 

229 num_windows=1, 

230 min_nfft=16, overlap_frac=0.5, 

231 detrend='constant', window='hann'): 

232 """Power spectra computed for consecutive data windows. 

233 

234 See also psd() for more information on power spectra with given 

235 frequency resolution. 

236 

237 Parameters 

238 ---------- 

239 data: 1-D array 

240 Data from which power spectra are computed. 

241 ratetime: float or array 

242 If float, sampling rate of the data in Hertz. 

243 If array, assume `ratetime` to be the time array 

244 corresponding to the data. 

245 Compute sampling rate as `1/(ratetime[1]-ratetime[0])`. 

246 freq_resolution: float 

247 Frequency resolution of psd in Hertz. 

248 num_windows: int 

249 Data are chopped into `num_windows` segments that overlap by half 

250 for which power spectra are computed. 

251 min_nfft: int 

252 Smallest value of nfft to be used. 

253 overlap_frac: float 

254 Fraction of overlap for the fft windows within a single power spectrum. 

255 detrend: string 

256 If 'constant' subtract mean of data. 

257 If 'linear' subtract line fitted to the data. 

258 If 'none' do not deternd the data. 

259 window: string 

260 Function used for windowing data segements. 

261 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen, 

262 bohman, blackmanharris, nuttall, fattop, barthann 

263 (see scipy.signal window functions). 

264 

265 Returns 

266 ------- 

267 multi_psd_data: list of 2-D arrays 

268 List of the power spectra for each window and frequency resolution 

269 (`psd_data[i][freq, power]`). 

270 """ 

271 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0]) 

272 n_incr = len(data)//(num_windows+1) # overlap by half a window 

273 multi_psd_data = [] 

274 for k in range(num_windows): 

275 freq, power = psd(data[k*n_incr:(k+2)*n_incr], rate, 

276 freq_resolution, min_nfft, 2*n_incr, 

277 overlap_frac, detrend, window) 

278 multi_psd_data.append(np.column_stack((freq, power))) 

279 return multi_psd_data 

280 

281 

282def spectrogram(data, ratetime, freq_resolution=0.2, min_nfft=16, 

283 max_nfft=None, overlap_frac=0.5, 

284 detrend='constant', window='hann'): 

285 """Spectrogram of a given frequency resolution. 

286 

287 Check the returned frequency array for the actually used frequency 

288 resolution. 

289 The actual frequency resolution is smaller or equal to `freq_resolution`. 

290 The used number of data points per FFT segment (NFFT) is the 

291 sampling rate divided by the actual frequency resolution: 

292 

293 ``` 

294 spec, freq, time = spectrum(data, samplingrate, 0.1) # request 0.1Hz resolution 

295 df = np.mean(np.diff(freq)) # the actual frequency resolution 

296 nfft = int(samplingrate/df) 

297 ``` 

298  

299 Parameters 

300 ---------- 

301 data: 1D or 2D array of floats 

302 Data for the spectrograms. First dimension is time, 

303 optional second dimension is channel. 

304 ratetime: float or array 

305 If float, sampling rate of the data in Hertz. 

306 If array, assume `ratetime` to be the time array 

307 corresponding to the data. 

308 The sampling rate is then computed as `1/(ratetime[1]-ratetime[0])`. 

309 freq_resolution: float 

310 Frequency resolution for the spectrogram in Hertz. See `nfft()` 

311 for details. 

312 min_nfft: int 

313 Smallest value of nfft to be used. See `nfft()` for details. 

314 max_nfft: int or None 

315 If not None, largest value of nfft to be used. 

316 See `nfft()` for details. 

317 overlap_frac: float 

318 Overlap of the nffts (0 = no overlap; 1 = complete overlap). 

319 detrend: string or False 

320 If 'constant' subtract mean of each data segment. 

321 If 'linear' subtract line fitted to each data segment. 

322 If `False` do not detrend the data segments. 

323 window: string 

324 Function used for windowing data segements. 

325 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen, 

326 bohman, blackmanharris, nuttall, fattop, barthann, tukey 

327 (see scipy.signal window functions). 

328 

329 Returns 

330 ------- 

331 freqs: array 

332 Frequencies of the spectrogram. 

333 time: array 

334 Time of the nfft windows. 

335 spectrum: 2D or 3D array 

336 Power spectral density for each frequency and time. 

337 First dimension is frequency and second dimension is time. 

338 Optional last dimension is channel. 

339 """ 

340 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0]) 

341 n_fft = nfft(rate, freq_resolution, min_nfft, max_nfft) 

342 noverlap = int(n_fft * overlap_frac) 

343 if specgramscipy: 

344 freqs, time, spec = sspecgram(data, fs=rate, window=window, 

345 nperseg=n_fft, noverlap=noverlap, 

346 detrend=detrend, scaling='density', 

347 mode='psd', axis=0) 

348 if data.ndim > 1: 

349 # scipy spectrogram() returns f x n x t 

350 spec = np.transpose(spec, (0, 2, 1)) 

351 else: 

352 if data.ndim > 1: 

353 spec = None 

354 for k in range(data.shape[1]): 

355 try: 

356 ssx, freqs, time = mspecgram(data[:,k], NFFT=n_fft, Fs=rate, 

357 noverlap=noverlap, 

358 detrend=detrend, 

359 scale_by_freq=True, 

360 scale='linear', 

361 mode='psd', 

362 window=get_window(window, n_fft)) 

363 except TypeError: 

364 ssx, freqs, time = mspecgram(data[:,k], NFFT=n_fft, Fs=rate, 

365 noverlap=noverlap, 

366 detrend=detrend, 

367 scale_by_freq=True, 

368 window=get_window(window, n_fft)) 

369 if spec is None: 

370 spec = np.zeros((len(freqs), len(time), data.shape[1])) 

371 spec[:,:,k] = ssx 

372 else: 

373 try: 

374 spec, freqs, time = mspecgram(data, NFFT=n_fft, Fs=rate, 

375 noverlap=noverlap, 

376 detrend=detrend, 

377 scale_by_freq=True, scale='linear', 

378 mode='psd', 

379 window=get_window(window, n_fft)) 

380 except TypeError: 

381 spec, freqs, time = mspecgram(data, NFFT=n_fft, Fs=rate, 

382 noverlap=noverlap, 

383 detrend=detrend, 

384 scale_by_freq=True, 

385 window=get_window(window, n_fft)) 

386 return freqs, time, spec 

387 

388 

389def plot_decibel_psd(ax, freqs, power, ref_power=1.0, min_power=1e-20, 

390 log_freq=False, min_freq=0.0, max_freq=2000.0, 

391 ymarg=0.0, sstyle=dict(color='tab:blue', lw=1)): 

392 """Plot the powerspectum in decibel relative to `ref_power`. 

393 

394 Parameters 

395 ---------- 

396 ax: 

397 Axis for plot. 

398 freqs: 1-D array 

399 Frequency array of the power spectrum. 

400 power: 1-D array 

401 Power values of the power spectrum. 

402 ref_power: float 

403 Reference power for computing decibel. If set to `None` the maximum power is used. 

404 min_power: float 

405 Power values smaller than `min_power` are set to `np.nan`. 

406 log_freq: boolean 

407 Logarithmic (True) or linear (False) frequency axis. 

408 min_freq: float 

409 Limits of frequency axis are set to `(min_freq, max_freq)` 

410 if `max_freq` is greater than zero 

411 max_freq: float 

412 Limits of frequency axis are set to `(min_freq, max_freq)` 

413 and limits of power axis are computed from powers below max_freq 

414 if `max_freq` is greater than zero 

415 ymarg: float 

416 Add this to the maximum decibel power for setting the ylim. 

417 sstyle: dict 

418 Plot parameter that are passed on to the `plot()` function. 

419 """ 

420 decibel_psd = decibel(power, ref_power=ref_power, min_power=min_power) 

421 ax.plot(freqs, decibel_psd, **sstyle) 

422 ax.set_xlabel('Frequency [Hz]') 

423 if max_freq > 0.0: 

424 if log_freq and min_freq < 1e-8: 

425 min_freq = 1.0 

426 ax.set_xlim(min_freq, max_freq) 

427 else: 

428 max_freq = freqs[-1] 

429 if log_freq: 

430 ax.set_xscale('log') 

431 dpmf = decibel_psd[freqs < max_freq] 

432 pmin = np.min(dpmf[np.isfinite(dpmf)]) 

433 pmin = np.floor(pmin / 10.0) * 10.0 

434 pmax = np.max(dpmf[np.isfinite(dpmf)]) 

435 pmax = np.ceil((pmax + ymarg) / 10.0) * 10.0 

436 ax.set_ylim(pmin, pmax) 

437 ax.set_ylabel('Power [dB]') 

438 

439 

440def peak_freqs(onsets, offsets, data, rate, freq_resolution=0.2, 

441 thresh=None, **kwargs): 

442 """Peak frequencies computed from power spectra of data snippets. 

443 

444 Parameters 

445 ---------- 

446 onsets: array of ints 

447 Indices indicating the onsets of the snippets in `data`. 

448 offsets: array of ints 

449 Indices indicating the offsets of the snippets in `data`. 

450 data: 1-D array 

451 Data array that contains the data snippets defined by 

452 `onsets` and `offsets`. 

453 rate: float 

454 Sampling rate of data in Hertz. 

455 freq_resolution: float 

456 Desired frequency resolution of the computed power spectra in Hertz. 

457 thresh: None or float 

458 If not None than this is the threshold required for the minimum height 

459 of the peak in the decibel power spectrum. If the peak is too small 

460 than the peak frequency of that snippet is set to NaN. 

461 kwargs: dict 

462 Further arguments passed on to psd(). 

463 

464 Returns 

465 ------- 

466 freqs: array of floats 

467 For each data snippet the frequency of the maximum power. 

468 """ 

469 freqs = [] 

470 for i0, i1 in zip(onsets, offsets): 

471 if 'max_nfft' in kwargs: 

472 del kwargs['max_nfft'] 

473 f, power = psd(data[i0:i1], rate, freq_resolution, 

474 max_nfft=i1-i0, **kwargs) 

475 if thresh is None: 

476 fpeak = f[np.argmax(power)] 

477 else: 

478 p, _ = detect_peaks(decibel(power, None), thresh) 

479 if len(p) > 0: 

480 ipeak = np.argmax(power[p]) 

481 fpeak = f[p[ipeak]] 

482 else: 

483 fpeak = float('NaN') 

484 freqs.append(fpeak) 

485 return np.array(freqs) 

486 

487 

488def add_multi_psd_config(cfg, freq_resolution=0.2, 

489 num_resolutions=1, num_windows=1): 

490 """Add all parameters needed for the multi_psd() function as 

491 a new section to a configuration. 

492 

493 Parameters 

494 ---------- 

495 cfg: ConfigFile 

496 The configuration. 

497  

498 See multi_psd() for details on the remaining arguments. 

499 """ 

500 cfg.add_section('Power spectrum estimation:') 

501 cfg.add('frequencyResolution', freq_resolution, 'Hz', 'Frequency resolution of the power spectrum.') 

502 cfg.add('numberPSDWindows', num_resolutions, '', 'Number of windows on which power spectra are computed.') 

503 

504 

505def multi_psd_args(cfg): 

506 """Translates a configuration to the 

507 respective parameter names of the multi_psd() function. 

508  

509 The return value can then be passed as key-word arguments to 

510 this functions. 

511 

512 Parameters 

513 ---------- 

514 cfg: ConfigFile 

515 The configuration. 

516 

517 Returns 

518 ------- 

519 a: dict 

520 Dictionary with names of arguments of the multi_psd() function 

521 and their values as supplied by `cfg`. 

522 """ 

523 a = cfg.map({'freq_resolution': 'frequencyResolution', 

524 'num_windows': 'numberPSDResolutions'}) 

525 return a 

526 

527 

528def main(): 

529 import matplotlib.pyplot as plt 

530 print('Compute powerspectra of two sine waves (300 and 450 Hz)') 

531 

532 # generate data: 

533 fundamentals = [300, 450] # Hz 

534 rate = 100000.0 # Hz 

535 time = np.arange(0.0, 8.0, 1.0/rate) 

536 data = np.sin(2*np.pi*fundamentals[0]*time) + 0.5*np.sin(2*np.pi*fundamentals[1]*time) 

537 

538 # compute power spectra: 

539 psd_data = multi_psd(data, rate, freq_resolution=0.5, num_windows=3, 

540 detrend='none', window='hann') 

541 

542 # plot power spectra: 

543 fig, ax = plt.subplots() 

544 for k in range(len(psd_data)): 

545 df = np.mean(np.diff(psd_data[k][:,0])) 

546 nfft = int(rate/df) 

547 plot_decibel_psd(ax, psd_data[k][:,0], psd_data[k][:,1], 

548 sstyle=dict(lw=2, 

549 label=f'$\\Delta f={df:.1f}$ Hz, nnft={nfft}')) 

550 ax.legend(loc='upper right') 

551 plt.show() 

552 

553 

554if __name__ == '__main__': 

555 main()