Utilities Module / ユーティリティモジュール
The wandas.utils module provides various utility functions used in the Wandas library.
wandas.utils モジュールは、Wandasライブラリで使用される様々なユーティリティ機能を提供します。
Frame Dataset / フレームデータセット
Provides dataset utilities for managing multiple data frames.
複数のデータフレームを管理するためのデータセットユーティリティを提供します。
Overview / 概要
The FrameDataset classes enable efficient batch processing of audio files in a folder. Key features include:
FrameDataset クラスは、フォルダ内の音声ファイルの効率的なバッチ処理を可能にします。主な機能:
- Lazy Loading: Load files only when accessed, reducing memory usage.
遅延読み込み: アクセス時のみファイルを読み込み、メモリ使用量を削減。
- Transformation Chaining: Apply multiple processing operations efficiently.
変換のチェーン: 複数の処理操作を効率的に適用。
- Sampling: Extract random subsets for testing or analysis.
サンプリング: テストや分析のためにランダムなサブセットを抽出。
- Metadata Tracking: Keep track of dataset properties and processing history.
メタデータ追跡: データセットのプロパティと処理履歴を記録。
Main Classes / 主なクラス
ChannelFrameDataset: For time-domain audio data (WAV, MP3, FLAC, CSV files).
ChannelFrameDataset: 時間領域の音声データ用(WAV、MP3、FLAC、CSVファイル)。
SpectrogramFrameDataset: For time-frequency domain data (typically created from STFT).
SpectrogramFrameDataset: 時間周波数領域データ用(通常はSTFTから作成)。
Basic Usage / 基本的な使用方法
from wandas.utils.frame_dataset import ChannelFrameDataset
# Create a dataset from a folder
# フォルダからデータセットを作成
dataset = ChannelFrameDataset.from_folder(
folder_path="path/to/audio/files",
sampling_rate=16000, # Optional: resample all files to this rate / オプション: すべてのファイルをこのレートにリサンプリング
file_extensions=[".wav", ".mp3"], # File types to include / 含めるファイルタイプ
recursive=True, # Search subdirectories / サブディレクトリを検索
lazy_loading=True # Load files on demand (recommended) / オンデマンドでファイルを読み込む(推奨)
)
# Access individual files
# 個別のファイルにアクセス
first_file = dataset[0]
print(f"File: {first_file.label}")
print(f"Duration: {first_file.duration}s")
# Get dataset information
# データセット情報を取得
metadata = dataset.get_metadata()
print(f"Total files: {metadata['file_count']}")
print(f"Loaded files: {metadata['loaded_count']}")
Sampling / サンプリング
Extract random subsets of the dataset for testing or analysis:
テストや分析のためにデータセットのランダムなサブセットを抽出:
# Sample by number of files
# ファイル数でサンプリング
sampled = dataset.sample(n=10, seed=42)
# Sample by ratio
# 比率でサンプリング
sampled = dataset.sample(ratio=0.1, seed=42)
# Default: 10% or minimum 1 file
# デフォルト: 10% または最低1ファイル
sampled = dataset.sample(seed=42)
Apply processing operations to all files in the dataset:
データセット内のすべてのファイルに処理操作を適用:
# Built-in transformations
# 組み込みの変換
resampled = dataset.resample(target_sr=8000)
trimmed = dataset.trim(start=0.5, end=2.0)
# Chain multiple transformations
# 複数の変換をチェーン
processed = (
dataset
.resample(target_sr=8000)
.trim(start=0.5, end=2.0)
)
# Custom transformation
# カスタム変換
def custom_filter(frame):
return frame.low_pass_filter(cutoff=1000)
filtered = dataset.apply(custom_filter)
STFT - Spectrogram Generation / STFT - スペクトログラム生成
Convert time-domain data to spectrograms:
時間領域データをスペクトログラムに変換:
# Create spectrogram dataset
# スペクトログラムデータセットを作成
spec_dataset = dataset.stft(
n_fft=2048,
hop_length=512,
window="hann"
)
# Access a spectrogram
# スペクトログラムにアクセス
spec_frame = spec_dataset[0]
spec_frame.plot()
Iteration / 反復処理
Process all files in the dataset:
データセット内のすべてのファイルを処理:
for i in range(len(dataset)):
frame = dataset[i]
if frame is not None:
# Process the frame
# フレームを処理
print(f"Processing {frame.label}...")
Key Parameters / 主なパラメータ
folder_path (str): Path to the folder containing audio files.
音声ファイルを含むフォルダへのパス。
sampling_rate (Optional[int]): Target sampling rate. Files will be resampled if different from this rate.
ターゲットサンプリングレート。このレートと異なる場合、ファイルはリサンプリングされます。
file_extensions (Optional[list[str]]): List of file extensions to include. Default: [".wav", ".mp3", ".flac", ".csv"].
含めるファイル拡張子のリスト。デフォルト: [".wav", ".mp3", ".flac", ".csv"]。
lazy_loading (bool): If True, files are loaded only when accessed. Default: True.
Trueの場合、ファイルはアクセス時にのみ読み込まれます。デフォルト: True。
recursive (bool): If True, search subdirectories recursively. Default: False.
Trueの場合、サブディレクトリを再帰的に検索します。デフォルト: False。
Examples / 使用例
For detailed examples, see the learning-path/ directory and the tutorial notebooks listed in the Tutorial section.
詳細な例については、learning-path/ ディレクトリとチュートリアルノートブックを参照してください。
API Reference / APIリファレンス
wandas.utils.frame_dataset
Attributes
logger = logging.getLogger(__name__)
module-attribute
FrameType = ChannelFrame | SpectrogramFrame
module-attribute
F = TypeVar('F', bound=FrameType)
module-attribute
F_out = TypeVar('F_out', bound=FrameType)
module-attribute
Classes
LazyFrame
dataclass
Bases: Generic[F]
A class that encapsulates a frame and its loading state.
Attributes:
| Name |
Type |
Description |
file_path |
Path
|
File path associated with the frame
|
frame |
F | None
|
Loaded frame object (None if not loaded)
|
is_loaded |
bool
|
Flag indicating if the frame is loaded
|
load_attempted |
bool
|
Flag indicating if loading was attempted (for error detection)
|
Source code in wandas/utils/frame_dataset.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 | @dataclass
class LazyFrame(Generic[F]):
"""
A class that encapsulates a frame and its loading state.
Attributes:
file_path: File path associated with the frame
frame: Loaded frame object (None if not loaded)
is_loaded: Flag indicating if the frame is loaded
load_attempted: Flag indicating if loading was attempted (for error detection)
"""
file_path: Path
frame: F | None = None
is_loaded: bool = False
load_attempted: bool = False
def ensure_loaded(self, loader: Callable[[Path], F | None]) -> F | None:
"""
Ensures the frame is loaded, loading it if necessary.
Args:
loader: Function to load a frame from a file path
Returns:
The loaded frame, or None if loading failed
"""
# Return the current frame if already loaded
if self.is_loaded:
return self.frame
# Attempt to load if not loaded yet
try:
self.load_attempted = True
self.frame = loader(self.file_path)
self.is_loaded = True
return self.frame
except Exception as e:
logger.error(f"Failed to load file {self.file_path}: {str(e)}")
self.is_loaded = True # Loading was attempted
self.frame = None
return None
def reset(self) -> None:
"""
Reset the frame state.
"""
self.frame = None
self.is_loaded = False
self.load_attempted = False
|
Attributes
file_path
instance-attribute
frame = None
class-attribute
instance-attribute
is_loaded = False
class-attribute
instance-attribute
load_attempted = False
class-attribute
instance-attribute
Functions
__init__(file_path, frame=None, is_loaded=False, load_attempted=False)
ensure_loaded(loader)
Ensures the frame is loaded, loading it if necessary.
Parameters:
| Name |
Type |
Description |
Default |
loader
|
Callable[[Path], F | None]
|
Function to load a frame from a file path
|
required
|
Returns:
| Type |
Description |
F | None
|
The loaded frame, or None if loading failed
|
Source code in wandas/utils/frame_dataset.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | def ensure_loaded(self, loader: Callable[[Path], F | None]) -> F | None:
"""
Ensures the frame is loaded, loading it if necessary.
Args:
loader: Function to load a frame from a file path
Returns:
The loaded frame, or None if loading failed
"""
# Return the current frame if already loaded
if self.is_loaded:
return self.frame
# Attempt to load if not loaded yet
try:
self.load_attempted = True
self.frame = loader(self.file_path)
self.is_loaded = True
return self.frame
except Exception as e:
logger.error(f"Failed to load file {self.file_path}: {str(e)}")
self.is_loaded = True # Loading was attempted
self.frame = None
return None
|
reset()
Reset the frame state.
Source code in wandas/utils/frame_dataset.py
| def reset(self) -> None:
"""
Reset the frame state.
"""
self.frame = None
self.is_loaded = False
self.load_attempted = False
|
FrameDataset
Bases: Generic[F], ABC
Abstract base dataset class for processing files in a folder.
Includes lazy loading capability to efficiently handle large datasets.
Subclasses handle specific frame types (ChannelFrame, SpectrogramFrame, etc.).
Source code in wandas/utils/frame_dataset.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410 | class FrameDataset(Generic[F], ABC):
"""
Abstract base dataset class for processing files in a folder.
Includes lazy loading capability to efficiently handle large datasets.
Subclasses handle specific frame types (ChannelFrame, SpectrogramFrame, etc.).
"""
def __init__(
self,
folder_path: str,
sampling_rate: int | None = None,
signal_length: int | None = None,
file_extensions: list[str] | None = None,
lazy_loading: bool = True,
recursive: bool = False,
source_dataset: Optional["FrameDataset[Any]"] = None,
transform: Callable[[Any], F | None] | None = None,
):
self.folder_path = Path(folder_path)
if source_dataset is None and not self.folder_path.exists():
raise FileNotFoundError(f"Folder does not exist: {self.folder_path}")
self.sampling_rate = sampling_rate
self.signal_length = signal_length
self.file_extensions = file_extensions or [".wav"]
self._recursive = recursive
self._lazy_loading = lazy_loading
# Changed to a list of LazyFrame
self._lazy_frames: list[LazyFrame[F]] = []
self._source_dataset = source_dataset
self._transform = transform
if self._source_dataset:
self._initialize_from_source()
else:
self._initialize_from_folder()
def _initialize_from_source(self) -> None:
"""Initialize from a source dataset."""
if self._source_dataset is None:
return
# Copy file paths from source
file_paths = self._source_dataset._get_file_paths()
self._lazy_frames = [LazyFrame(file_path) for file_path in file_paths]
# Inherit other properties
self.sampling_rate = self.sampling_rate or self._source_dataset.sampling_rate
self.signal_length = self.signal_length or self._source_dataset.signal_length
self.file_extensions = self.file_extensions or self._source_dataset.file_extensions
self._recursive = self._source_dataset._recursive
self.folder_path = self._source_dataset.folder_path
def _initialize_from_folder(self) -> None:
"""Initialize from a folder."""
self._discover_files()
if not self._lazy_loading:
self._load_all_files()
def _discover_files(self) -> None:
"""Discover files in the folder and store them in a list of LazyFrame."""
file_paths = []
for ext in self.file_extensions:
pattern = f"**/*{ext}" if self._recursive else f"*{ext}"
file_paths.extend(sorted(p for p in self.folder_path.glob(pattern) if p.is_file()))
# Remove duplicates and sort
file_paths = sorted(list(set(file_paths)))
# Create a list of LazyFrame
self._lazy_frames = [LazyFrame(file_path) for file_path in file_paths]
def _load_all_files(self) -> None:
"""Load all files."""
for i in tqdm(range(len(self._lazy_frames)), desc="Loading/transforming"):
try:
self._ensure_loaded(i)
except Exception as e:
filepath = self._lazy_frames[i].file_path
logger.warning(f"Failed to load/transform index {i} ({filepath}): {str(e)}")
self._lazy_loading = False
@abstractmethod
def _load_file(self, file_path: Path) -> F | None:
"""Abstract method to load a frame from a file."""
pass
def _load_from_source(self, index: int) -> F | None:
"""Load a frame from the source dataset and transform it if necessary."""
if self._source_dataset is None or self._transform is None:
return None
source_frame = self._source_dataset._ensure_loaded(index)
if source_frame is None:
return None
try:
return self._transform(source_frame)
except Exception as e:
msg = f"Failed to transform index {index}: {str(e)}"
logger.warning(msg)
# Also emit to the root logger to improve capture reliability
# in test runners and across different logging configurations
logging.getLogger().warning(msg)
return None
def _ensure_loaded(self, index: int) -> F | None:
"""Ensure the frame at the given index is loaded."""
if not (0 <= index < len(self._lazy_frames)):
raise IndexError(f"Index {index} is out of range (0-{len(self._lazy_frames) - 1})")
lazy_frame = self._lazy_frames[index]
# Return if already loaded
if lazy_frame.is_loaded:
return lazy_frame.frame
try:
# Convert from source dataset
if self._transform and self._source_dataset:
lazy_frame.load_attempted = True
frame = self._load_from_source(index)
lazy_frame.frame = frame
lazy_frame.is_loaded = True
return frame
# Load directly from file
else:
return lazy_frame.ensure_loaded(self._load_file)
except Exception as e:
f_path = lazy_frame.file_path
logger.error(f"Failed to load or initialize index {index} ({f_path}): {str(e)}")
lazy_frame.frame = None
lazy_frame.is_loaded = True
lazy_frame.load_attempted = True
return None
def _get_file_paths(self) -> list[Path]:
"""Get a list of file paths."""
return [lazy_frame.file_path for lazy_frame in self._lazy_frames]
def __len__(self) -> int:
"""Return the number of files in the dataset."""
return len(self._lazy_frames)
def get_by_label(self, label: str) -> F | None:
"""
Get a frame by its label (filename).
Parameters
----------
label : str
The filename (label) to search for (e.g., 'sample_1.wav').
Returns
-------
Optional[F]
The frame if found, otherwise None.
Examples
--------
>>> frame = dataset.get_by_label("sample_1.wav")
>>> if frame:
... print(frame.label)
"""
# Keep for backward compatibility: return the first match but emit
# a DeprecationWarning recommending `get_all_by_label`.
all_matches = self.get_all_by_label(label)
if len(all_matches) > 0:
warnings.warn(
"get_by_label() returns the first matching frame and is deprecated; "
"use get_all_by_label() to obtain all matches.",
DeprecationWarning,
stacklevel=2,
)
return all_matches[0]
return None
def get_all_by_label(self, label: str) -> list[F]:
"""
Get all frames matching the given label (filename).
Parameters
----------
label : str
The filename (label) to search for (e.g., 'sample_1.wav').
Returns
-------
list[F]
A list of frames matching the label.
If none are found, returns an empty list.
Notes
-----
- Search is performed against the filename portion only (i.e. Path.name).
- Each matched frame will be loaded (triggering lazy load) via `_ensure_loaded`.
"""
matches: list[F] = []
for i, lazy_frame in enumerate(self._lazy_frames):
if lazy_frame.file_path.name == label:
loaded = self._ensure_loaded(i)
if loaded is not None:
matches.append(loaded)
return matches
@overload
def __getitem__(self, key: int) -> F | None: ...
@overload
def __getitem__(self, key: str) -> list[F]: ...
def __getitem__(self, key: int | str) -> F | None | list[F]:
"""
Get the frame by index (int) or label (str).
Parameters
----------
key : int or str
Index (int) or filename/label (str).
Returns
-------
Optional[F] or list[F]
If `key` is an int, returns the frame or None. If `key` is a str,
returns a list of matching frames (may be empty).
Examples
--------
>>> frame = dataset[0] # by index
>>> frames = dataset["sample_1.wav"] # list of matches by filename
"""
if isinstance(key, int):
return self._ensure_loaded(key)
if isinstance(key, str):
# pandas-like behaviour: return all matches for the label as a list
return self.get_all_by_label(key)
raise TypeError(f"Invalid key type: {type(key)}. Must be int or str.")
@overload
def apply(self, func: Callable[[F], F_out | None]) -> "FrameDataset[F_out]": ...
@overload
def apply(self, func: Callable[[F], Any | None]) -> "FrameDataset[Any]": ...
def apply(self, func: Callable[[F], Any | None]) -> "FrameDataset[Any]":
"""Apply a function to the entire dataset to create a new dataset."""
new_dataset = type(self)(
folder_path=str(self.folder_path),
lazy_loading=True,
source_dataset=self,
transform=func,
sampling_rate=self.sampling_rate,
signal_length=self.signal_length,
file_extensions=self.file_extensions,
recursive=self._recursive,
)
return cast("FrameDataset[Any]", new_dataset)
def save(self, output_folder: str, filename_prefix: str = "") -> None:
"""Save processed frames to files."""
raise NotImplementedError("The save method is not currently implemented.")
def sample(
self,
n: int | None = None,
ratio: float | None = None,
seed: int | None = None,
) -> "FrameDataset[F]":
"""Get a sample from the dataset."""
if seed is not None:
random.seed(seed)
total = len(self._lazy_frames)
if total == 0:
return type(self)(
str(self.folder_path),
sampling_rate=self.sampling_rate,
signal_length=self.signal_length,
file_extensions=self.file_extensions,
lazy_loading=self._lazy_loading,
recursive=self._recursive,
)
# Determine sample size
if n is None and ratio is None:
n = max(1, min(10, int(total * 0.1)))
elif n is None and ratio is not None:
n = max(1, int(total * ratio))
elif n is not None:
n = max(1, n)
else:
n = 1
n = min(n, total)
# Randomly select indices
sampled_indices = sorted(random.sample(range(total), n))
return _SampledFrameDataset(self, sampled_indices)
def get_metadata(self) -> dict[str, Any]:
"""Get metadata for the dataset."""
actual_sr: int | float | None = self.sampling_rate
frame_type_name = "Unknown"
# Count loaded frames
loaded_count = sum(1 for lazy_frame in self._lazy_frames if lazy_frame.is_loaded)
# Get metadata from the first frame (if possible)
first_frame: F | None = None
if len(self._lazy_frames) > 0:
try:
if self._lazy_frames[0].is_loaded:
first_frame = self._lazy_frames[0].frame
if first_frame:
actual_sr = getattr(first_frame, "sampling_rate", self.sampling_rate)
frame_type_name = type(first_frame).__name__
except Exception as e:
logger.warning(f"Error accessing the first frame during metadata retrieval: {e}")
return {
"folder_path": str(self.folder_path),
"file_count": len(self._lazy_frames),
"loaded_count": loaded_count,
"target_sampling_rate": self.sampling_rate,
"actual_sampling_rate": actual_sr,
"signal_length": self.signal_length,
"file_extensions": self.file_extensions,
"lazy_loading": self._lazy_loading,
"recursive": self._recursive,
"frame_type": frame_type_name,
"has_transform": self._transform is not None,
"is_sampled": isinstance(self, _SampledFrameDataset),
}
|
Attributes
folder_path = Path(folder_path)
instance-attribute
sampling_rate = sampling_rate
instance-attribute
signal_length = signal_length
instance-attribute
file_extensions = file_extensions or ['.wav']
instance-attribute
Functions
__init__(folder_path, sampling_rate=None, signal_length=None, file_extensions=None, lazy_loading=True, recursive=False, source_dataset=None, transform=None)
Source code in wandas/utils/frame_dataset.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 | def __init__(
self,
folder_path: str,
sampling_rate: int | None = None,
signal_length: int | None = None,
file_extensions: list[str] | None = None,
lazy_loading: bool = True,
recursive: bool = False,
source_dataset: Optional["FrameDataset[Any]"] = None,
transform: Callable[[Any], F | None] | None = None,
):
self.folder_path = Path(folder_path)
if source_dataset is None and not self.folder_path.exists():
raise FileNotFoundError(f"Folder does not exist: {self.folder_path}")
self.sampling_rate = sampling_rate
self.signal_length = signal_length
self.file_extensions = file_extensions or [".wav"]
self._recursive = recursive
self._lazy_loading = lazy_loading
# Changed to a list of LazyFrame
self._lazy_frames: list[LazyFrame[F]] = []
self._source_dataset = source_dataset
self._transform = transform
if self._source_dataset:
self._initialize_from_source()
else:
self._initialize_from_folder()
|
__len__()
Return the number of files in the dataset.
Source code in wandas/utils/frame_dataset.py
| def __len__(self) -> int:
"""Return the number of files in the dataset."""
return len(self._lazy_frames)
|
get_by_label(label)
Get a frame by its label (filename).
Parameters
label : str
The filename (label) to search for (e.g., 'sample_1.wav').
Returns
Optional[F]
The frame if found, otherwise None.
Examples
frame = dataset.get_by_label("sample_1.wav")
if frame:
... print(frame.label)
Source code in wandas/utils/frame_dataset.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 | def get_by_label(self, label: str) -> F | None:
"""
Get a frame by its label (filename).
Parameters
----------
label : str
The filename (label) to search for (e.g., 'sample_1.wav').
Returns
-------
Optional[F]
The frame if found, otherwise None.
Examples
--------
>>> frame = dataset.get_by_label("sample_1.wav")
>>> if frame:
... print(frame.label)
"""
# Keep for backward compatibility: return the first match but emit
# a DeprecationWarning recommending `get_all_by_label`.
all_matches = self.get_all_by_label(label)
if len(all_matches) > 0:
warnings.warn(
"get_by_label() returns the first matching frame and is deprecated; "
"use get_all_by_label() to obtain all matches.",
DeprecationWarning,
stacklevel=2,
)
return all_matches[0]
return None
|
get_all_by_label(label)
Get all frames matching the given label (filename).
Parameters
label : str
The filename (label) to search for (e.g., 'sample_1.wav').
Returns
list[F]
A list of frames matching the label.
If none are found, returns an empty list.
Notes
- Search is performed against the filename portion only (i.e. Path.name).
- Each matched frame will be loaded (triggering lazy load) via
_ensure_loaded.
Source code in wandas/utils/frame_dataset.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279 | def get_all_by_label(self, label: str) -> list[F]:
"""
Get all frames matching the given label (filename).
Parameters
----------
label : str
The filename (label) to search for (e.g., 'sample_1.wav').
Returns
-------
list[F]
A list of frames matching the label.
If none are found, returns an empty list.
Notes
-----
- Search is performed against the filename portion only (i.e. Path.name).
- Each matched frame will be loaded (triggering lazy load) via `_ensure_loaded`.
"""
matches: list[F] = []
for i, lazy_frame in enumerate(self._lazy_frames):
if lazy_frame.file_path.name == label:
loaded = self._ensure_loaded(i)
if loaded is not None:
matches.append(loaded)
return matches
|
__getitem__(key)
__getitem__(key: int) -> F | None
__getitem__(key: str) -> list[F]
Get the frame by index (int) or label (str).
Parameters
key : int or str
Index (int) or filename/label (str).
Returns
Optional[F] or list[F]
If key is an int, returns the frame or None. If key is a str,
returns a list of matching frames (may be empty).
Examples
frame = dataset[0] # by index
frames = dataset["sample_1.wav"] # list of matches by filename
Source code in wandas/utils/frame_dataset.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 | def __getitem__(self, key: int | str) -> F | None | list[F]:
"""
Get the frame by index (int) or label (str).
Parameters
----------
key : int or str
Index (int) or filename/label (str).
Returns
-------
Optional[F] or list[F]
If `key` is an int, returns the frame or None. If `key` is a str,
returns a list of matching frames (may be empty).
Examples
--------
>>> frame = dataset[0] # by index
>>> frames = dataset["sample_1.wav"] # list of matches by filename
"""
if isinstance(key, int):
return self._ensure_loaded(key)
if isinstance(key, str):
# pandas-like behaviour: return all matches for the label as a list
return self.get_all_by_label(key)
raise TypeError(f"Invalid key type: {type(key)}. Must be int or str.")
|
apply(func)
apply(func: Callable[[F], F_out | None]) -> FrameDataset[F_out]
apply(func: Callable[[F], Any | None]) -> FrameDataset[Any]
Apply a function to the entire dataset to create a new dataset.
Source code in wandas/utils/frame_dataset.py
320
321
322
323
324
325
326
327
328
329
330
331
332 | def apply(self, func: Callable[[F], Any | None]) -> "FrameDataset[Any]":
"""Apply a function to the entire dataset to create a new dataset."""
new_dataset = type(self)(
folder_path=str(self.folder_path),
lazy_loading=True,
source_dataset=self,
transform=func,
sampling_rate=self.sampling_rate,
signal_length=self.signal_length,
file_extensions=self.file_extensions,
recursive=self._recursive,
)
return cast("FrameDataset[Any]", new_dataset)
|
save(output_folder, filename_prefix='')
Save processed frames to files.
Source code in wandas/utils/frame_dataset.py
| def save(self, output_folder: str, filename_prefix: str = "") -> None:
"""Save processed frames to files."""
raise NotImplementedError("The save method is not currently implemented.")
|
sample(n=None, ratio=None, seed=None)
Get a sample from the dataset.
Source code in wandas/utils/frame_dataset.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374 | def sample(
self,
n: int | None = None,
ratio: float | None = None,
seed: int | None = None,
) -> "FrameDataset[F]":
"""Get a sample from the dataset."""
if seed is not None:
random.seed(seed)
total = len(self._lazy_frames)
if total == 0:
return type(self)(
str(self.folder_path),
sampling_rate=self.sampling_rate,
signal_length=self.signal_length,
file_extensions=self.file_extensions,
lazy_loading=self._lazy_loading,
recursive=self._recursive,
)
# Determine sample size
if n is None and ratio is None:
n = max(1, min(10, int(total * 0.1)))
elif n is None and ratio is not None:
n = max(1, int(total * ratio))
elif n is not None:
n = max(1, n)
else:
n = 1
n = min(n, total)
# Randomly select indices
sampled_indices = sorted(random.sample(range(total), n))
return _SampledFrameDataset(self, sampled_indices)
|
Get metadata for the dataset.
Source code in wandas/utils/frame_dataset.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410 | def get_metadata(self) -> dict[str, Any]:
"""Get metadata for the dataset."""
actual_sr: int | float | None = self.sampling_rate
frame_type_name = "Unknown"
# Count loaded frames
loaded_count = sum(1 for lazy_frame in self._lazy_frames if lazy_frame.is_loaded)
# Get metadata from the first frame (if possible)
first_frame: F | None = None
if len(self._lazy_frames) > 0:
try:
if self._lazy_frames[0].is_loaded:
first_frame = self._lazy_frames[0].frame
if first_frame:
actual_sr = getattr(first_frame, "sampling_rate", self.sampling_rate)
frame_type_name = type(first_frame).__name__
except Exception as e:
logger.warning(f"Error accessing the first frame during metadata retrieval: {e}")
return {
"folder_path": str(self.folder_path),
"file_count": len(self._lazy_frames),
"loaded_count": loaded_count,
"target_sampling_rate": self.sampling_rate,
"actual_sampling_rate": actual_sr,
"signal_length": self.signal_length,
"file_extensions": self.file_extensions,
"lazy_loading": self._lazy_loading,
"recursive": self._recursive,
"frame_type": frame_type_name,
"has_transform": self._transform is not None,
"is_sampled": isinstance(self, _SampledFrameDataset),
}
|
ChannelFrameDataset
Bases: FrameDataset[ChannelFrame]
Dataset class for handling audio files as ChannelFrames in a folder.
Source code in wandas/utils/frame_dataset.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666 | class ChannelFrameDataset(FrameDataset[ChannelFrame]):
"""
Dataset class for handling audio files as ChannelFrames in a folder.
"""
def __init__(
self,
folder_path: str,
sampling_rate: int | None = None,
signal_length: int | None = None,
file_extensions: list[str] | None = None,
lazy_loading: bool = True,
recursive: bool = False,
source_dataset: Optional["FrameDataset[Any]"] = None,
transform: Callable[[Any], ChannelFrame | None] | None = None,
):
_file_extensions = file_extensions or [
".wav",
".mp3",
".flac",
".csv",
]
super().__init__(
folder_path=folder_path,
sampling_rate=sampling_rate,
signal_length=signal_length,
file_extensions=_file_extensions,
lazy_loading=lazy_loading,
recursive=recursive,
source_dataset=source_dataset,
transform=transform,
)
def _load_file(self, file_path: Path) -> ChannelFrame | None:
"""Load an audio file and return a ChannelFrame."""
try:
frame = ChannelFrame.from_file(file_path)
if self.sampling_rate and frame.sampling_rate != self.sampling_rate:
logger.info(
f"Resampling file {file_path.name} ({frame.sampling_rate} Hz) to "
f"dataset rate ({self.sampling_rate} Hz)."
)
frame = frame.resampling(target_sr=self.sampling_rate)
return frame
except Exception as e:
logger.error(f"Failed to load or initialize file {file_path}: {str(e)}")
return None
def resample(self, target_sr: int) -> "ChannelFrameDataset":
"""Resample all frames in the dataset."""
def _resample_func(frame: ChannelFrame) -> ChannelFrame | None:
if frame is None:
return None
try:
return frame.resampling(target_sr=target_sr)
except Exception as e:
logger.warning(f"Resampling error (target_sr={target_sr}): {e}")
return None
new_dataset = self.apply(_resample_func)
return cast(ChannelFrameDataset, new_dataset)
def trim(self, start: float, end: float) -> "ChannelFrameDataset":
"""Trim all frames in the dataset."""
def _trim_func(frame: ChannelFrame) -> ChannelFrame | None:
if frame is None:
return None
try:
return frame.trim(start=start, end=end)
except Exception as e:
logger.warning(f"Trimming error (start={start}, end={end}): {e}")
return None
new_dataset = self.apply(_trim_func)
return cast(ChannelFrameDataset, new_dataset)
def normalize(self, **kwargs: Any) -> "ChannelFrameDataset":
"""Normalize all frames in the dataset."""
def _normalize_func(frame: ChannelFrame) -> ChannelFrame | None:
if frame is None:
return None
try:
return frame.normalize(**kwargs)
except Exception as e:
logger.warning(f"Normalization error ({kwargs}): {e}")
return None
new_dataset = self.apply(_normalize_func)
return cast(ChannelFrameDataset, new_dataset)
def stft(
self,
n_fft: int = 2048,
hop_length: int | None = None,
win_length: int | None = None,
window: str = "hann",
) -> "SpectrogramFrameDataset":
"""Apply STFT to all frames in the dataset."""
_hop = hop_length or n_fft // 4
def _stft_func(frame: ChannelFrame) -> SpectrogramFrame | None:
if frame is None:
return None
try:
return frame.stft(
n_fft=n_fft,
hop_length=_hop,
win_length=win_length,
window=window,
)
except Exception as e:
logger.warning(f"STFT error (n_fft={n_fft}, hop={_hop}): {e}")
return None
new_dataset = SpectrogramFrameDataset(
folder_path=str(self.folder_path),
lazy_loading=True,
source_dataset=self,
transform=_stft_func,
sampling_rate=self.sampling_rate,
)
return new_dataset
@classmethod
def from_folder(
cls,
folder_path: str,
sampling_rate: int | None = None,
file_extensions: list[str] | None = None,
recursive: bool = False,
lazy_loading: bool = True,
) -> "ChannelFrameDataset":
"""Class method to create a ChannelFrameDataset from a folder."""
extensions = file_extensions if file_extensions is not None else [".wav", ".mp3", ".flac", ".csv"]
return cls(
folder_path,
sampling_rate=sampling_rate,
file_extensions=extensions,
lazy_loading=lazy_loading,
recursive=recursive,
)
|
Functions
__init__(folder_path, sampling_rate=None, signal_length=None, file_extensions=None, lazy_loading=True, recursive=False, source_dataset=None, transform=None)
Source code in wandas/utils/frame_dataset.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553 | def __init__(
self,
folder_path: str,
sampling_rate: int | None = None,
signal_length: int | None = None,
file_extensions: list[str] | None = None,
lazy_loading: bool = True,
recursive: bool = False,
source_dataset: Optional["FrameDataset[Any]"] = None,
transform: Callable[[Any], ChannelFrame | None] | None = None,
):
_file_extensions = file_extensions or [
".wav",
".mp3",
".flac",
".csv",
]
super().__init__(
folder_path=folder_path,
sampling_rate=sampling_rate,
signal_length=signal_length,
file_extensions=_file_extensions,
lazy_loading=lazy_loading,
recursive=recursive,
source_dataset=source_dataset,
transform=transform,
)
|
resample(target_sr)
Resample all frames in the dataset.
Source code in wandas/utils/frame_dataset.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583 | def resample(self, target_sr: int) -> "ChannelFrameDataset":
"""Resample all frames in the dataset."""
def _resample_func(frame: ChannelFrame) -> ChannelFrame | None:
if frame is None:
return None
try:
return frame.resampling(target_sr=target_sr)
except Exception as e:
logger.warning(f"Resampling error (target_sr={target_sr}): {e}")
return None
new_dataset = self.apply(_resample_func)
return cast(ChannelFrameDataset, new_dataset)
|
trim(start, end)
Trim all frames in the dataset.
Source code in wandas/utils/frame_dataset.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598 | def trim(self, start: float, end: float) -> "ChannelFrameDataset":
"""Trim all frames in the dataset."""
def _trim_func(frame: ChannelFrame) -> ChannelFrame | None:
if frame is None:
return None
try:
return frame.trim(start=start, end=end)
except Exception as e:
logger.warning(f"Trimming error (start={start}, end={end}): {e}")
return None
new_dataset = self.apply(_trim_func)
return cast(ChannelFrameDataset, new_dataset)
|
normalize(**kwargs)
Normalize all frames in the dataset.
Source code in wandas/utils/frame_dataset.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613 | def normalize(self, **kwargs: Any) -> "ChannelFrameDataset":
"""Normalize all frames in the dataset."""
def _normalize_func(frame: ChannelFrame) -> ChannelFrame | None:
if frame is None:
return None
try:
return frame.normalize(**kwargs)
except Exception as e:
logger.warning(f"Normalization error ({kwargs}): {e}")
return None
new_dataset = self.apply(_normalize_func)
return cast(ChannelFrameDataset, new_dataset)
|
stft(n_fft=2048, hop_length=None, win_length=None, window='hann')
Apply STFT to all frames in the dataset.
Source code in wandas/utils/frame_dataset.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646 | def stft(
self,
n_fft: int = 2048,
hop_length: int | None = None,
win_length: int | None = None,
window: str = "hann",
) -> "SpectrogramFrameDataset":
"""Apply STFT to all frames in the dataset."""
_hop = hop_length or n_fft // 4
def _stft_func(frame: ChannelFrame) -> SpectrogramFrame | None:
if frame is None:
return None
try:
return frame.stft(
n_fft=n_fft,
hop_length=_hop,
win_length=win_length,
window=window,
)
except Exception as e:
logger.warning(f"STFT error (n_fft={n_fft}, hop={_hop}): {e}")
return None
new_dataset = SpectrogramFrameDataset(
folder_path=str(self.folder_path),
lazy_loading=True,
source_dataset=self,
transform=_stft_func,
sampling_rate=self.sampling_rate,
)
return new_dataset
|
from_folder(folder_path, sampling_rate=None, file_extensions=None, recursive=False, lazy_loading=True)
classmethod
Class method to create a ChannelFrameDataset from a folder.
Source code in wandas/utils/frame_dataset.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666 | @classmethod
def from_folder(
cls,
folder_path: str,
sampling_rate: int | None = None,
file_extensions: list[str] | None = None,
recursive: bool = False,
lazy_loading: bool = True,
) -> "ChannelFrameDataset":
"""Class method to create a ChannelFrameDataset from a folder."""
extensions = file_extensions if file_extensions is not None else [".wav", ".mp3", ".flac", ".csv"]
return cls(
folder_path,
sampling_rate=sampling_rate,
file_extensions=extensions,
lazy_loading=lazy_loading,
recursive=recursive,
)
|
SpectrogramFrameDataset
Bases: FrameDataset[SpectrogramFrame]
Dataset class for handling spectrogram data as SpectrogramFrames.
Expected to be generated mainly as a result of ChannelFrameDataset.stft().
Source code in wandas/utils/frame_dataset.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722 | class SpectrogramFrameDataset(FrameDataset[SpectrogramFrame]):
"""
Dataset class for handling spectrogram data as SpectrogramFrames.
Expected to be generated mainly as a result of ChannelFrameDataset.stft().
"""
def __init__(
self,
folder_path: str,
sampling_rate: int | None = None,
signal_length: int | None = None,
file_extensions: list[str] | None = None,
lazy_loading: bool = True,
recursive: bool = False,
source_dataset: Optional["FrameDataset[Any]"] = None,
transform: Callable[[Any], SpectrogramFrame | None] | None = None,
):
super().__init__(
folder_path=folder_path,
sampling_rate=sampling_rate,
signal_length=signal_length,
file_extensions=file_extensions,
lazy_loading=lazy_loading,
recursive=recursive,
source_dataset=source_dataset,
transform=transform,
)
def _load_file(self, file_path: Path) -> SpectrogramFrame | None:
"""Direct loading from files is not currently supported."""
logger.warning(
"No method defined for directly loading SpectrogramFrames. Normally "
"created from ChannelFrameDataset.stft()."
)
raise NotImplementedError("No method defined for directly loading SpectrogramFrames")
def plot(self, index: int, **kwargs: Any) -> None:
"""Plot the spectrogram at the specified index."""
try:
frame = self._ensure_loaded(index)
if frame is None:
logger.warning(f"Cannot plot index {index} as it failed to load/transform.")
return
plot_method = getattr(frame, "plot", None)
if callable(plot_method):
plot_method(**kwargs)
else:
logger.warning(
f"Frame (index {index}, type {type(frame).__name__}) does not have a plot method implemented."
)
except Exception as e:
logger.error(f"An error occurred while plotting index {index}: {e}")
|
Functions
__init__(folder_path, sampling_rate=None, signal_length=None, file_extensions=None, lazy_loading=True, recursive=False, source_dataset=None, transform=None)
Source code in wandas/utils/frame_dataset.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695 | def __init__(
self,
folder_path: str,
sampling_rate: int | None = None,
signal_length: int | None = None,
file_extensions: list[str] | None = None,
lazy_loading: bool = True,
recursive: bool = False,
source_dataset: Optional["FrameDataset[Any]"] = None,
transform: Callable[[Any], SpectrogramFrame | None] | None = None,
):
super().__init__(
folder_path=folder_path,
sampling_rate=sampling_rate,
signal_length=signal_length,
file_extensions=file_extensions,
lazy_loading=lazy_loading,
recursive=recursive,
source_dataset=source_dataset,
transform=transform,
)
|
plot(index, **kwargs)
Plot the spectrogram at the specified index.
Source code in wandas/utils/frame_dataset.py
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722 | def plot(self, index: int, **kwargs: Any) -> None:
"""Plot the spectrogram at the specified index."""
try:
frame = self._ensure_loaded(index)
if frame is None:
logger.warning(f"Cannot plot index {index} as it failed to load/transform.")
return
plot_method = getattr(frame, "plot", None)
if callable(plot_method):
plot_method(**kwargs)
else:
logger.warning(
f"Frame (index {index}, type {type(frame).__name__}) does not have a plot method implemented."
)
except Exception as e:
logger.error(f"An error occurred while plotting index {index}: {e}")
|
Sample Generation / サンプル生成
Provides functions for generating sample data for testing.
テスト用のサンプルデータを生成する機能を提供します。
wandas.utils.generate_sample
Classes
Functions
generate_sin(freqs=1000, sampling_rate=16000, duration=1.0, label=None)
Generate sample sine wave signals.
Parameters
freqs : float or list of float, default=1000
Frequency of the sine wave(s) in Hz.
If multiple frequencies are specified, multiple channels will be created.
sampling_rate : int, default=16000
Sampling rate in Hz.
duration : float, default=1.0
Duration of the signal in seconds.
label : str, optional
Label for the entire signal.
Returns
ChannelFrame
ChannelFrame object containing the sine wave(s).
Source code in wandas/utils/generate_sample.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 | def generate_sin(
freqs: float | list[float] = 1000,
sampling_rate: int = 16000,
duration: float = 1.0,
label: str | None = None,
) -> "ChannelFrame":
"""
Generate sample sine wave signals.
Parameters
----------
freqs : float or list of float, default=1000
Frequency of the sine wave(s) in Hz.
If multiple frequencies are specified, multiple channels will be created.
sampling_rate : int, default=16000
Sampling rate in Hz.
duration : float, default=1.0
Duration of the signal in seconds.
label : str, optional
Label for the entire signal.
Returns
-------
ChannelFrame
ChannelFrame object containing the sine wave(s).
"""
# 直接、generate_sin_lazy関数を呼び出す
return generate_sin_lazy(freqs=freqs, sampling_rate=sampling_rate, duration=duration, label=label)
|
generate_sin_lazy(freqs=1000, sampling_rate=16000, duration=1.0, label=None)
Generate sample sine wave signals using lazy computation.
Parameters
freqs : float or list of float, default=1000
Frequency of the sine wave(s) in Hz.
If multiple frequencies are specified, multiple channels will be created.
sampling_rate : int, default=16000
Sampling rate in Hz.
duration : float, default=1.0
Duration of the signal in seconds.
label : str, optional
Label for the entire signal.
Returns
ChannelFrame
Lazy ChannelFrame object containing the sine wave(s).
Source code in wandas/utils/generate_sample.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | def generate_sin_lazy(
freqs: float | list[float] = 1000,
sampling_rate: int = 16000,
duration: float = 1.0,
label: str | None = None,
) -> "ChannelFrame":
"""
Generate sample sine wave signals using lazy computation.
Parameters
----------
freqs : float or list of float, default=1000
Frequency of the sine wave(s) in Hz.
If multiple frequencies are specified, multiple channels will be created.
sampling_rate : int, default=16000
Sampling rate in Hz.
duration : float, default=1.0
Duration of the signal in seconds.
label : str, optional
Label for the entire signal.
Returns
-------
ChannelFrame
Lazy ChannelFrame object containing the sine wave(s).
"""
from wandas.frames.channel import ChannelFrame
label = label or "Generated Sin"
t = np.linspace(0, duration, int(sampling_rate * duration), endpoint=False)
_freqs: list[float]
if isinstance(freqs, float):
_freqs = [freqs]
elif isinstance(freqs, list):
_freqs = freqs
else:
raise ValueError("freqs must be a float or a list of floats.")
channels = []
labels = []
for idx, freq in enumerate(_freqs):
data = np.sin(2 * np.pi * freq * t)
labels.append(f"Channel {idx + 1}")
channels.append(data)
return ChannelFrame.from_numpy(
data=np.array(channels),
label=label,
sampling_rate=sampling_rate,
ch_labels=labels,
)
|
Type Definitions / 型定義
Provides type definitions used in Wandas.
Wandasで使用される型定義を提供します。
wandas.utils.types
Attributes
Real = np.number[Any]
module-attribute
Complex = np.complexfloating[Any, Any]
module-attribute
NDArrayReal = npt.NDArray[Real]
module-attribute
NDArrayComplex = npt.NDArray[Complex]
module-attribute
General Utilities / 一般ユーティリティ
Provides other general utility functions.
その他の一般的なユーティリティ機能を提供します。
wandas.utils.util
Attributes
Functions
validate_sampling_rate(sampling_rate, param_name='sampling_rate')
Validate that sampling rate is positive.
Parameters
sampling_rate : float
Sampling rate in Hz to validate.
param_name : str, default="sampling_rate"
Name of the parameter being validated (for error messages).
Raises
ValueError
If sampling_rate is not positive (i.e., <= 0).
Examples
validate_sampling_rate(44100) # No error
validate_sampling_rate(0) # Raises ValueError
validate_sampling_rate(-100) # Raises ValueError
Source code in wandas/utils/util.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 | def validate_sampling_rate(sampling_rate: float, param_name: str = "sampling_rate") -> None:
"""
Validate that sampling rate is positive.
Parameters
----------
sampling_rate : float
Sampling rate in Hz to validate.
param_name : str, default="sampling_rate"
Name of the parameter being validated (for error messages).
Raises
------
ValueError
If sampling_rate is not positive (i.e., <= 0).
Examples
--------
>>> validate_sampling_rate(44100) # No error
>>> validate_sampling_rate(0) # Raises ValueError
>>> validate_sampling_rate(-100) # Raises ValueError
"""
if sampling_rate <= 0:
raise ValueError(
f"Invalid {param_name}\n"
f" Got: {sampling_rate} Hz\n"
f" Expected: Positive value > 0\n"
f"Sampling rate represents samples per second and must be positive.\n"
f"Common values: 8000, 16000, 22050, 44100, 48000 Hz"
)
|
unit_to_ref(unit)
Convert unit to reference value.
Parameters
unit : str
Unit string.
Returns
float
Reference value for the unit. For 'Pa', returns 2e-5 (20 μPa).
For other units, returns 1.0.
Source code in wandas/utils/util.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 | def unit_to_ref(unit: str) -> float:
"""
Convert unit to reference value.
Parameters
----------
unit : str
Unit string.
Returns
-------
float
Reference value for the unit. For 'Pa', returns 2e-5 (20 μPa).
For other units, returns 1.0.
"""
if unit == "Pa":
return 2e-5
else:
return 1.0
|
calculate_rms(wave)
Calculate the root mean square of the wave.
Parameters
wave : NDArrayReal
Input waveform data. Can be multi-channel (shape: [channels, samples])
or single channel (shape: [samples]).
Returns
Union[float, NDArray[np.float64]]
RMS value(s). For multi-channel input, returns an array of RMS values,
one per channel. For single-channel input, returns a single RMS value.
Source code in wandas/utils/util.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 | def calculate_rms(wave: "NDArrayReal") -> "NDArrayReal":
"""
Calculate the root mean square of the wave.
Parameters
----------
wave : NDArrayReal
Input waveform data. Can be multi-channel (shape: [channels, samples])
or single channel (shape: [samples]).
Returns
-------
Union[float, NDArray[np.float64]]
RMS value(s). For multi-channel input, returns an array of RMS values,
one per channel. For single-channel input, returns a single RMS value.
"""
# Calculate RMS considering axis (over the last dimension)
axis_to_use = -1 if wave.ndim > 1 else None
rms_values: NDArrayReal = np.sqrt(np.mean(np.square(wave), axis=axis_to_use, keepdims=True))
return rms_values
|
calculate_desired_noise_rms(clean_rms, snr)
Calculate the desired noise RMS based on clean signal RMS and target SNR.
Parameters
clean_rms : "NDArrayReal"
RMS value(s) of the clean signal.
Can be a single value or an array for multi-channel.
snr : float
Target Signal-to-Noise Ratio in dB.
Returns
"NDArrayReal"
Desired noise RMS value(s) to achieve the target SNR.
Source code in wandas/utils/util.py
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 | def calculate_desired_noise_rms(clean_rms: "NDArrayReal", snr: float) -> "NDArrayReal":
"""
Calculate the desired noise RMS based on clean signal RMS and target SNR.
Parameters
----------
clean_rms : "NDArrayReal"
RMS value(s) of the clean signal.
Can be a single value or an array for multi-channel.
snr : float
Target Signal-to-Noise Ratio in dB.
Returns
-------
"NDArrayReal"
Desired noise RMS value(s) to achieve the target SNR.
"""
a = snr / 20
noise_rms = clean_rms / (10**a)
return noise_rms
|
amplitude_to_db(amplitude, ref)
Convert amplitude to decibel.
Parameters
amplitude : NDArrayReal
Input amplitude data.
ref : float
Reference value for conversion.
Returns
NDArrayReal
Amplitude data converted to decibels.
Source code in wandas/utils/util.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 | def amplitude_to_db(amplitude: "NDArrayReal", ref: float) -> "NDArrayReal":
"""
Convert amplitude to decibel.
Parameters
----------
amplitude : NDArrayReal
Input amplitude data.
ref : float
Reference value for conversion.
Returns
-------
NDArrayReal
Amplitude data converted to decibels.
"""
db: NDArrayReal = librosa.amplitude_to_db(np.abs(amplitude), ref=ref, amin=1e-15, top_db=None)
return db
|
level_trigger(data, level, offset=0, hold=1)
Find points where the signal crosses the specified level from below.
Parameters
data : NDArrayReal
Input signal data.
level : float
Threshold level for triggering.
offset : int, default=0
Offset to add to trigger points.
hold : int, default=1
Minimum number of samples between successive trigger points.
Returns
list of int
List of sample indices where the signal crosses the level.
Source code in wandas/utils/util.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 | def level_trigger(data: "NDArrayReal", level: float, offset: int = 0, hold: int = 1) -> list[int]:
"""
Find points where the signal crosses the specified level from below.
Parameters
----------
data : NDArrayReal
Input signal data.
level : float
Threshold level for triggering.
offset : int, default=0
Offset to add to trigger points.
hold : int, default=1
Minimum number of samples between successive trigger points.
Returns
-------
list of int
List of sample indices where the signal crosses the level.
"""
trig_point: list[int] = []
sig_len = len(data)
diff = np.diff(np.sign(data - level))
level_point = np.where(diff > 0)[0]
level_point = level_point[(level_point + hold) < sig_len]
if len(level_point) == 0:
return list()
last_point = level_point[0]
trig_point.append(last_point + offset)
for i in level_point:
if (last_point + hold) < i:
trig_point.append(i + offset)
last_point = i
return trig_point
|
cut_sig(data, point_list, cut_len, taper_rate=0, dc_cut=False)
Cut segments from signal at specified points.
Parameters
data : NDArrayReal
Input signal data.
point_list : list of int
List of starting points for cutting.
cut_len : int
Length of each segment to cut.
taper_rate : float, default=0
Taper rate for Tukey window applied to segments.
A value of 0 means no tapering, 1 means full tapering.
dc_cut : bool, default=False
Whether to remove DC component (mean) from segments.
Returns
NDArrayReal
Array containing cut segments with shape (n_segments, cut_len).
Source code in wandas/utils/util.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 | def cut_sig(
data: "NDArrayReal",
point_list: list[int],
cut_len: int,
taper_rate: float = 0,
dc_cut: bool = False,
) -> "NDArrayReal":
"""
Cut segments from signal at specified points.
Parameters
----------
data : NDArrayReal
Input signal data.
point_list : list of int
List of starting points for cutting.
cut_len : int
Length of each segment to cut.
taper_rate : float, default=0
Taper rate for Tukey window applied to segments.
A value of 0 means no tapering, 1 means full tapering.
dc_cut : bool, default=False
Whether to remove DC component (mean) from segments.
Returns
-------
NDArrayReal
Array containing cut segments with shape (n_segments, cut_len).
"""
length = len(data)
point_list_ = [p for p in point_list if p >= 0 and p + cut_len <= length]
trial: NDArrayReal = np.zeros((len(point_list_), cut_len))
for i, v in enumerate(point_list_):
trial[i] = data[v : v + cut_len]
if dc_cut:
trial[i] = trial[i] - trial[i].mean()
win: NDArrayReal = tukey(cut_len, taper_rate).astype(trial.dtype)[np.newaxis, :]
trial = trial * win
return trial
|