Coverage for src/thunderlab/powerspectrum.py: 94%

142 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-09 16:02 +0000

1"""Powerspectra and spectrograms for a given frequency resolution 

2 

3## Computation of nfft 

4 

5- `next_power_of_two()`: round an integer up to the next power of two. 

6- `nfft()`: compute nfft based on a given frequency resolution. 

7 

8## Decibel 

9 

10- `decibel()`: transform power to decibel. 

11- `power()`: transform decibel to power. 

12 

13## Power spectra  

14 

15- `psd()`: power spectrum for a given frequency resolution. 

16- `multi_psd()`: power spectra for consecutive data windows and mutiple frequency resolutions. 

17- `spectrogram()`: spectrogram of a given frequency resolution and overlap fraction. 

18 

19## Power spectrum analysis 

20 

21- `peak_freqs()`: peak frequencies computed from power spectra of data snippets. 

22 

23## Visualization 

24 

25- `plot_decibel_psd()`: plot power spectrum in decibel. 

26 

27## Configuration parameter 

28 

29- `add_multi_psd_config()`: add parameters for multi_psd() to configuration. 

30- `multi_psd_args()`: retrieve parameters for mulit_psd() from configuration. 

31""" 

32 

33import numpy as np 

34from scipy.signal import get_window 

35from matplotlib.mlab import psd as mpsd 

36try: 

37 from scipy.signal import welch 

38 psdscipy = True 

39except ImportError: 

40 psdscipy = False 

41from matplotlib.mlab import specgram as mspecgram 

42try: 

43 from scipy.signal import spectrogram as sspecgram 

44 specgramscipy = True 

45except ImportError: 

46 specgramscipy = False 

47from .eventdetection import detect_peaks 

48 

49 

50def next_power_of_two(n): 

51 """The next integer power of two. 

52  

53 Parameters 

54 ---------- 

55 n: int 

56 A positive number. 

57 

58 Returns 

59 ------- 

60 m: int 

61 The next integer power of two equal or larger than `n`. 

62 """ 

63 return int(2 ** np.floor(np.log(n) / np.log(2.0) + 1.0-1e-8)) 

64 

65 

66def nfft(rate, freq_resolution, min_nfft=16, max_nfft=None): 

67 """Required number of samples for an FFT of a given frequency resolution. 

68 

69 Note that the returned number of FFT samples results 

70 in frequency intervals that are smaller or equal to `freq_resolution`. 

71 

72 Parameters 

73 ---------- 

74 rate: float 

75 Sampling rate of the data in Hertz. 

76 freq_resolution: float 

77 Minimum frequency resolution in Hertz. 

78 min_nfft: int 

79 Smallest value of nfft to be used. 

80 max_nfft: int or None 

81 If not None, largest value of nfft to be used. 

82 

83 Returns 

84 ------- 

85 nfft: int 

86 Number of FFT points. 

87 """ 

88 nfft = next_power_of_two(rate / freq_resolution) 

89 if not max_nfft is None: 

90 if nfft > max_nfft: 

91 nfft = next_power_of_two(max_nfft//2 + 1) 

92 if nfft < min_nfft: 

93 nfft = min_nfft 

94 return nfft 

95 

96 

97def decibel(power, ref_power=1.0, min_power=1e-20): 

98 """Transform power to decibel relative to ref_power. 

99 

100 \\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\] 

101 Power values smaller than `min_power` are set to `-np.inf`. 

102 

103 Parameters 

104 ---------- 

105 power: float or array 

106 Power values, for example from a power spectrum or spectrogram. 

107 ref_power: float or None or 'peak' 

108 Reference power for computing decibel. 

109 If set to `None` or 'peak', the maximum power is used. 

110 min_power: float 

111 Power values smaller than `min_power` are set to `-np.inf`. 

112 

113 Returns 

114 ------- 

115 decibel_psd: array 

116 Power values in decibel relative to `ref_power`. 

117 """ 

118 if np.isscalar(power): 

119 tmp_power = np.array([power]) 

120 decibel_psd = np.array([power]) 

121 else: 

122 tmp_power = power 

123 decibel_psd = power.copy() 

124 if ref_power is None or ref_power == 'peak': 

125 ref_power = np.max(decibel_psd) 

126 decibel_psd[tmp_power <= min_power] = float('-inf') 

127 decibel_psd[tmp_power > min_power] = 10.0 * np.log10(decibel_psd[tmp_power > min_power]/ref_power) 

128 if np.isscalar(power): 

129 return decibel_psd[0] 

130 else: 

131 return decibel_psd 

132 

133 

134def power(decibel, ref_power=1.0): 

135 """Transform decibel back to power relative to `ref_power`. 

136 

137 \\[ power = ref\\_power \\cdot 10^{decibel/10} \\] 

138  

139 Parameters 

140 ---------- 

141 decibel: array 

142 Decibel values of the power spectrum or spectrogram. 

143 ref_power: float 

144 Reference power for computing power. 

145 

146 Returns 

147 ------- 

148 power: array 

149 Power values of the power spectrum or spectrogram. 

150 """ 

151 return ref_power * 10.0 ** (0.1 * decibel) 

152 

153 

154def psd(data, ratetime, freq_resolution, min_nfft=16, max_nfft=None, 

155 overlap_frac=0.5, detrend='constant', window='hann'): 

156 """Power spectrum density of a given frequency resolution. 

157 

158 NFFT is computed from the requested frequency resolution and the 

159 sampling rate. Check the returned frequency array for the actually 

160 used frequency resolution. The frequency intervals are smaller or 

161 equal to `freq_resolution`. NFFT can be retrieved by dividing 

162 the sampling rate by the actual frequency resolution: 

163 ``` 

164 freq, power = psd(data, samplingrate, 0.1) 

165 df = np.mean(np.diff(freq)) # the actual frequency resolution 

166 nfft = int(samplingrate/df) 

167 ``` 

168 

169 Uses `scipy signal.welch()` if available, otherwise 

170 `matplotlib.mlab.psd()`. 

171 

172 Parameters 

173 ---------- 

174 data: 1-D array  

175 Data from which power spectra are computed. 

176 ratetime: float or array 

177 If float, sampling rate of the data in Hertz. 

178 If array, assume `ratetime` to be the time array 

179 corresponding to the data. 

180 Compute sampling rate as `1/(ratetime[1]-ratetime[0])`. 

181 freq_resolution: float 

182 Frequency resolution of the psd in Hertz. 

183 min_nfft: int 

184 Smallest value of nfft to be used. 

185 max_nfft: int or None 

186 If not None, largest value of nfft to be used. 

187 overlap_frac: float 

188 Fraction of overlap for the fft windows. 

189 detrend: string 

190 If 'constant' or 'mean' subtract mean of data. 

191 If 'linear' subtract line fitted to the data. 

192 If 'none' do not detrend the data. 

193 window: string 

194 Function used for windowing data segements. 

195 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen, 

196 bohman, blackmanharris, nuttall, fattop, barthann 

197 (see scipy.signal window functions). 

198 

199 Returns 

200 ------- 

201 freq: 1-D array 

202 Frequencies corresponding to power array. 

203 power: 1-D array 

204 Power spectral density in [data]^2/Hz. 

205 """ 

206 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0]) 

207 n_fft = nfft(rate, freq_resolution, min_nfft, max_nfft) 

208 noverlap = int(n_fft * overlap_frac) 

209 if psdscipy: 

210 if detrend == 'none': 

211 detrend = False 

212 elif detrend == 'mean': 

213 detrend = 'constant' 

214 freqs, power = welch(data, fs=rate, nperseg=n_fft, nfft=None, 

215 noverlap=noverlap, detrend=detrend, 

216 window=window, scaling='density') 

217 else: 

218 if detrend == 'constant': 

219 detrend = 'mean' 

220 power, freqs = mpsd(data, Fs=rate, NFFT=n_fft, 

221 noverlap=noverlap, detrend=detrend, 

222 window=get_window(window, n_fft), 

223 scale_by_freq=True) 

224 # squeeze is necessary when n_fft is too large with respect to the data: 

225 return freqs, np.squeeze(power) 

226 

227 

228def multi_psd(data, ratetime, freq_resolution=0.2, 

229 num_windows=1, 

230 min_nfft=16, overlap_frac=0.5, 

231 detrend='constant', window='hann'): 

232 """Power spectra computed for consecutive data windows. 

233 

234 See also psd() for more information on power spectra with given 

235 frequency resolution. 

236 

237 Parameters 

238 ---------- 

239 data: 1-D array 

240 Data from which power spectra are computed. 

241 ratetime: float or array 

242 If float, sampling rate of the data in Hertz. 

243 If array, assume `ratetime` to be the time array 

244 corresponding to the data. 

245 Compute sampling rate as `1/(ratetime[1]-ratetime[0])`. 

246 freq_resolution: float 

247 Frequency resolution of psd in Hertz. 

248 num_windows: int 

249 Data are chopped into `num_windows` segments that overlap by half 

250 for which power spectra are computed. 

251 min_nfft: int 

252 Smallest value of nfft to be used. 

253 overlap_frac: float 

254 Fraction of overlap for the fft windows within a single power spectrum. 

255 detrend: string 

256 If 'constant' subtract mean of data. 

257 If 'linear' subtract line fitted to the data. 

258 If 'none' do not deternd the data. 

259 window: string 

260 Function used for windowing data segements. 

261 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen, 

262 bohman, blackmanharris, nuttall, fattop, barthann 

263 (see scipy.signal window functions). 

264 

265 Returns 

266 ------- 

267 multi_psd_data: list of 2-D arrays 

268 List of the power spectra for each window and frequency resolution 

269 (`psd_data[i][freq, power]`). 

270 """ 

271 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0]) 

272 n_incr = len(data)//(num_windows+1) # overlap by half a window 

273 multi_psd_data = [] 

274 for k in range(num_windows): 

275 freq, power = psd(data[k*n_incr:(k+2)*n_incr], rate, 

276 freq_resolution, min_nfft, 2*n_incr, 

277 overlap_frac, detrend, window) 

278 multi_psd_data.append(np.column_stack((freq, power))) 

279 return multi_psd_data 

280 

281 

282def spectrogram(data, ratetime, freq_resolution=0.2, min_nfft=16, 

283 max_nfft=None, overlap_frac=0.5, 

284 detrend='constant', window='hann'): 

285 """Spectrogram of a given frequency resolution. 

286 

287 Check the returned frequency array for the actually used frequency 

288 resolution. 

289 The actual frequency resolution is smaller or equal to `freq_resolution`. 

290 The used number of data points per FFT segment (NFFT) is the 

291 sampling rate divided by the actual frequency resolution: 

292 

293 ``` 

294 spec, freq, time = spectrum(data, samplingrate, 0.1) # request 0.1Hz resolution 

295 df = np.mean(np.diff(freq)) # the actual frequency resolution 

296 nfft = int(samplingrate/df) 

297 ``` 

298  

299 Parameters 

300 ---------- 

301 data: 1D or 2D array of floats 

302 Data for the spectrograms. First dimension is time, 

303 optional second dimension is channel. 

304 ratetime: float or array 

305 If float, sampling rate of the data in Hertz. 

306 If array, assume `ratetime` to be the time array 

307 corresponding to the data. 

308 The sampling rate is then computed as `1/(ratetime[1]-ratetime[0])`. 

309 freq_resolution: float 

310 Frequency resolution for the spectrogram in Hertz. See `nfft()` 

311 for details. 

312 min_nfft: int 

313 Smallest value of nfft to be used. See `nfft()` for details. 

314 max_nfft: int or None 

315 If not None, largest value of nfft to be used. 

316 See `nfft()` for details. 

317 overlap_frac: float 

318 Overlap of the nffts (0 = no overlap; 1 = complete overlap). 

319 detrend: string or False 

320 If 'constant' subtract mean of each data segment. 

321 If 'linear' subtract line fitted to each data segment. 

322 If `False` do not detrend the data segments. 

323 window: string 

324 Function used for windowing data segements. 

325 One of hann, blackman, hamming, bartlett, boxcar, triang, parzen, 

326 bohman, blackmanharris, nuttall, fattop, barthann, tukey 

327 (see scipy.signal window functions). 

328 

329 Returns 

330 ------- 

331 freqs: array 

332 Frequencies of the spectrogram. 

333 time: array 

334 Time of the nfft windows. 

335 spectrum: 2D or 3D array 

336 Power spectral density for each frequency and time. 

337 First dimension is frequency and second dimension is time. 

338 Optional last dimension is channel. 

339 """ 

340 rate = ratetime if np.isscalar(ratetime) else 1.0/(ratetime[1]-ratetime[0]) 

341 n_fft = nfft(rate, freq_resolution, min_nfft, max_nfft) 

342 noverlap = int(n_fft * overlap_frac) 

343 if specgramscipy: 

344 freqs, time, spec = sspecgram(data, fs=rate, window=window, 

345 nperseg=n_fft, noverlap=noverlap, 

346 detrend=detrend, scaling='density', 

347 mode='psd', axis=0) 

348 if data.ndim > 1: 

349 # scipy spectrogram() returns f x n x t 

350 spec = np.transpose(spec, (0, 2, 1)) 

351 else: 

352 if data.ndim > 1: 

353 spec = None 

354 for k in range(data.shape[1]): 

355 try: 

356 ssx, freqs, time = mspecgram(data[:,k], NFFT=n_fft, Fs=rate, 

357 noverlap=noverlap, 

358 detrend=detrend, 

359 scale_by_freq=True, 

360 scale='linear', 

361 mode='psd', 

362 window=get_window(window, n_fft)) 

363 except TypeError: 

364 ssx, freqs, time = mspecgram(data[:,k], NFFT=n_fft, Fs=rate, 

365 noverlap=noverlap, 

366 detrend=detrend, 

367 scale_by_freq=True, 

368 window=get_window(window, n_fft)) 

369 if spec is None: 

370 spec = np.zeros((len(freqs), len(time), data.shape[1])) 

371 spec[:,:,k] = ssx 

372 else: 

373 try: 

374 spec, freqs, time = mspecgram(data, NFFT=n_fft, Fs=rate, 

375 noverlap=noverlap, 

376 detrend=detrend, 

377 scale_by_freq=True, scale='linear', 

378 mode='psd', 

379 window=get_window(window, n_fft)) 

380 except TypeError: 

381 spec, freqs, time = mspecgram(data, NFFT=n_fft, Fs=rate, 

382 noverlap=noverlap, 

383 detrend=detrend, 

384 scale_by_freq=True, 

385 window=get_window(window, n_fft)) 

386 return freqs, time, spec 

387 

388 

389def plot_decibel_psd(ax, freqs, power, ref_power=1.0, min_power=1e-20, 

390 log_freq=False, min_freq=0.0, max_freq=2000.0, ymarg=0.0, **kwargs): 

391 """Plot the powerspectum in decibel relative to `ref_power`. 

392 

393 Parameters 

394 ---------- 

395 ax: 

396 Axis for plot. 

397 freqs: 1-D array 

398 Frequency array of the power spectrum. 

399 power: 1-D array 

400 Power values of the power spectrum. 

401 ref_power: float 

402 Reference power for computing decibel. If set to `None` the maximum power is used. 

403 min_power: float 

404 Power values smaller than `min_power` are set to `np.nan`. 

405 log_freq: boolean 

406 Logarithmic (True) or linear (False) frequency axis. 

407 min_freq: float 

408 Limits of frequency axis are set to `(min_freq, max_freq)` 

409 if `max_freq` is greater than zero 

410 max_freq: float 

411 Limits of frequency axis are set to `(min_freq, max_freq)` 

412 and limits of power axis are computed from powers below max_freq 

413 if `max_freq` is greater than zero 

414 ymarg: float 

415 Add this to the maximum decibel power for setting the ylim. 

416 kwargs: dict 

417 Plot parameter that are passed on to the `plot()` function. 

418 """ 

419 decibel_psd = decibel(power, ref_power=ref_power, min_power=min_power) 

420 ax.plot(freqs, decibel_psd, **kwargs) 

421 ax.set_xlabel('Frequency [Hz]') 

422 if max_freq > 0.0: 

423 if log_freq and min_freq < 1e-8: 

424 min_freq = 1.0 

425 ax.set_xlim(min_freq, max_freq) 

426 else: 

427 max_freq = freqs[-1] 

428 if log_freq: 

429 ax.set_xscale('log') 

430 dpmf = decibel_psd[freqs < max_freq] 

431 pmin = np.min(dpmf[np.isfinite(dpmf)]) 

432 pmin = np.floor(pmin / 10.0) * 10.0 

433 pmax = np.max(dpmf[np.isfinite(dpmf)]) 

434 pmax = np.ceil((pmax + ymarg) / 10.0) * 10.0 

435 ax.set_ylim(pmin, pmax) 

436 ax.set_ylabel('Power [dB]') 

437 

438 

439def peak_freqs(onsets, offsets, data, rate, freq_resolution=0.2, 

440 thresh=None, **kwargs): 

441 """Peak frequencies computed from power spectra of data snippets. 

442 

443 Parameters 

444 ---------- 

445 onsets: array of ints 

446 Indices indicating the onsets of the snippets in `data`. 

447 offsets: array of ints 

448 Indices indicating the offsets of the snippets in `data`. 

449 data: 1-D array 

450 Data array that contains the data snippets defined by 

451 `onsets` and `offsets`. 

452 rate: float 

453 Sampling rate of data in Hertz. 

454 freq_resolution: float 

455 Desired frequency resolution of the computed power spectra in Hertz. 

456 thresh: None or float 

457 If not None than this is the threshold required for the minimum height 

458 of the peak in the decibel power spectrum. If the peak is too small 

459 than the peak frequency of that snippet is set to NaN. 

460 kwargs: dict 

461 Further arguments passed on to psd(). 

462 

463 Returns 

464 ------- 

465 freqs: array of floats 

466 For each data snippet the frequency of the maximum power. 

467 """ 

468 freqs = [] 

469 for i0, i1 in zip(onsets, offsets): 

470 if 'max_nfft' in kwargs: 

471 del kwargs['max_nfft'] 

472 f, power = psd(data[i0:i1], rate, freq_resolution, 

473 max_nfft=i1-i0, **kwargs) 

474 if thresh is None: 

475 fpeak = f[np.argmax(power)] 

476 else: 

477 p, _ = detect_peaks(decibel(power, None), thresh) 

478 if len(p) > 0: 

479 ipeak = np.argmax(power[p]) 

480 fpeak = f[p[ipeak]] 

481 else: 

482 fpeak = float('NaN') 

483 freqs.append(fpeak) 

484 return np.array(freqs) 

485 

486 

487def add_multi_psd_config(cfg, freq_resolution=0.2, 

488 num_resolutions=1, num_windows=1): 

489 """Add all parameters needed for the multi_psd() function as 

490 a new section to a configuration. 

491 

492 Parameters 

493 ---------- 

494 cfg: ConfigFile 

495 The configuration. 

496  

497 See multi_psd() for details on the remaining arguments. 

498 """ 

499 cfg.add_section('Power spectrum estimation:') 

500 cfg.add('frequencyResolution', freq_resolution, 'Hz', 'Frequency resolution of the power spectrum.') 

501 cfg.add('numberPSDWindows', num_resolutions, '', 'Number of windows on which power spectra are computed.') 

502 

503 

504def multi_psd_args(cfg): 

505 """Translates a configuration to the 

506 respective parameter names of the multi_psd() function. 

507  

508 The return value can then be passed as key-word arguments to 

509 this functions. 

510 

511 Parameters 

512 ---------- 

513 cfg: ConfigFile 

514 The configuration. 

515 

516 Returns 

517 ------- 

518 a: dict 

519 Dictionary with names of arguments of the multi_psd() function 

520 and their values as supplied by `cfg`. 

521 """ 

522 a = cfg.map({'freq_resolution': 'frequencyResolution', 

523 'num_windows': 'numberPSDResolutions'}) 

524 return a 

525 

526 

527def main(): 

528 import matplotlib.pyplot as plt 

529 print('Compute powerspectra of two sine waves (300 and 450 Hz)') 

530 

531 # generate data: 

532 fundamentals = [300, 450] # Hz 

533 rate = 100000.0 # Hz 

534 time = np.arange(0.0, 8.0, 1.0/rate) 

535 data = np.sin(2*np.pi*fundamentals[0]*time) + 0.5*np.sin(2*np.pi*fundamentals[1]*time) 

536 

537 # compute power spectra: 

538 psd_data = multi_psd(data, rate, freq_resolution=0.5, num_windows=3, 

539 detrend='none', window='hann') 

540 

541 # plot power spectra: 

542 fig, ax = plt.subplots() 

543 for k in range(len(psd_data)): 

544 df = np.mean(np.diff(psd_data[k][:,0])) 

545 nfft = int(rate/df) 

546 plot_decibel_psd(ax, psd_data[k][:,0], psd_data[k][:,1], lw=2, 

547 label='$\\Delta f = %.1f$ Hz, nnft=%d' % (df, nfft)) 

548 ax.legend(loc='upper right') 

549 plt.show() 

550 

551 

552if __name__ == '__main__': 

553 main()