Skip to content

Utilities Module / ユーティリティモジュール

The wandas.utils module provides various utility functions used in the Wandas library. wandas.utils モジュールは、Wandasライブラリで使用される様々なユーティリティ機能を提供します。

Frame Dataset / フレームデータセット

Provides dataset utilities for managing multiple data frames. 複数のデータフレームを管理するためのデータセットユーティリティを提供します。

Overview / 概要

The FrameDataset classes enable efficient batch processing of audio files in a folder. Key features include: FrameDataset クラスは、フォルダ内の音声ファイルの効率的なバッチ処理を可能にします。主な機能:

  • Lazy Loading: Load files only when accessed, reducing memory usage. 遅延読み込み: アクセス時のみファイルを読み込み、メモリ使用量を削減。
  • Transformation Chaining: Apply multiple processing operations efficiently. 変換のチェーン: 複数の処理操作を効率的に適用。
  • Sampling: Extract random subsets for testing or analysis. サンプリング: テストや分析のためにランダムなサブセットを抽出。
  • Metadata Tracking: Keep track of dataset properties and processing history. メタデータ追跡: データセットのプロパティと処理履歴を記録。

Main Classes / 主なクラス

  • ChannelFrameDataset: For time-domain audio data (WAV, MP3, FLAC, CSV files). ChannelFrameDataset: 時間領域の音声データ用(WAV、MP3、FLAC、CSVファイル)。
  • SpectrogramFrameDataset: For time-frequency domain data (typically created from STFT). SpectrogramFrameDataset: 時間周波数領域データ用(通常はSTFTから作成)。

Basic Usage / 基本的な使用方法

from wandas.utils.frame_dataset import ChannelFrameDataset

# Create a dataset from a folder
# フォルダからデータセットを作成
dataset = ChannelFrameDataset.from_folder(
    folder_path="path/to/audio/files",
    sampling_rate=16000,  # Optional: resample all files to this rate / オプション: すべてのファイルをこのレートにリサンプリング
    file_extensions=[".wav", ".mp3"],  # File types to include / 含めるファイルタイプ
    recursive=True,  # Search subdirectories / サブディレクトリを検索
    lazy_loading=True  # Load files on demand (recommended) / オンデマンドでファイルを読み込む(推奨)
)

# Access individual files
# 個別のファイルにアクセス
first_file = dataset[0]
print(f"File: {first_file.label}")
print(f"Duration: {first_file.duration}s")

# Get dataset information
# データセット情報を取得
metadata = dataset.get_metadata()
print(f"Total files: {metadata['file_count']}")
print(f"Loaded files: {metadata['loaded_count']}")

Sampling / サンプリング

Extract random subsets of the dataset for testing or analysis: テストや分析のためにデータセットのランダムなサブセットを抽出:

# Sample by number of files
# ファイル数でサンプリング
sampled = dataset.sample(n=10, seed=42)

# Sample by ratio
# 比率でサンプリング
sampled = dataset.sample(ratio=0.1, seed=42)

# Default: 10% or minimum 1 file
# デフォルト: 10% または最低1ファイル
sampled = dataset.sample(seed=42)

Transformations / 変換

Apply processing operations to all files in the dataset: データセット内のすべてのファイルに処理操作を適用:

# Built-in transformations
# 組み込みの変換
resampled = dataset.resample(target_sr=8000)
trimmed = dataset.trim(start=0.5, end=2.0)

# Chain multiple transformations
# 複数の変換をチェーン
processed = (
    dataset
    .resample(target_sr=8000)
    .trim(start=0.5, end=2.0)
)

# Custom transformation
# カスタム変換
def custom_filter(frame):
    return frame.low_pass_filter(cutoff=1000)

filtered = dataset.apply(custom_filter)

STFT - Spectrogram Generation / STFT - スペクトログラム生成

Convert time-domain data to spectrograms: 時間領域データをスペクトログラムに変換:

# Create spectrogram dataset
# スペクトログラムデータセットを作成
spec_dataset = dataset.stft(
    n_fft=2048,
    hop_length=512,
    window="hann"
)

# Access a spectrogram
# スペクトログラムにアクセス
spec_frame = spec_dataset[0]
spec_frame.plot()

Iteration / 反復処理

Process all files in the dataset: データセット内のすべてのファイルを処理:

for i in range(len(dataset)):
    frame = dataset[i]
    if frame is not None:
        # Process the frame
        # フレームを処理
        print(f"Processing {frame.label}...")

Key Parameters / 主なパラメータ

folder_path (str): Path to the folder containing audio files. 音声ファイルを含むフォルダへのパス。

sampling_rate (Optional[int]): Target sampling rate. Files will be resampled if different from this rate. ターゲットサンプリングレート。このレートと異なる場合、ファイルはリサンプリングされます。

file_extensions (Optional[list[str]]): List of file extensions to include. Default: [".wav", ".mp3", ".flac", ".csv"]. 含めるファイル拡張子のリスト。デフォルト: [".wav", ".mp3", ".flac", ".csv"]

lazy_loading (bool): If True, files are loaded only when accessed. Default: True. Trueの場合、ファイルはアクセス時にのみ読み込まれます。デフォルト: True。

recursive (bool): If True, search subdirectories recursively. Default: False. Trueの場合、サブディレクトリを再帰的に検索します。デフォルト: False。

Examples / 使用例

For detailed examples, see the learning-path/ directory and the tutorial notebooks listed in the Tutorial section. 詳細な例については、learning-path/ ディレクトリとチュートリアルノートブックを参照してください。

API Reference / APIリファレンス

wandas.utils.frame_dataset

Attributes

logger = logging.getLogger(__name__) module-attribute

FrameType = ChannelFrame | SpectrogramFrame module-attribute

F = TypeVar('F', bound=FrameType) module-attribute

F_out = TypeVar('F_out', bound=FrameType) module-attribute

Classes

LazyFrame dataclass

Bases: Generic[F]

A class that encapsulates a frame and its loading state.

Attributes:

Name Type Description
file_path Path

File path associated with the frame

frame F | None

Loaded frame object (None if not loaded)

is_loaded bool

Flag indicating if the frame is loaded

load_attempted bool

Flag indicating if loading was attempted (for error detection)

Source code in wandas/utils/frame_dataset.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@dataclass
class LazyFrame(Generic[F]):
    """
    A class that encapsulates a frame and its loading state.

    Attributes:
        file_path: File path associated with the frame
        frame: Loaded frame object (None if not loaded)
        is_loaded: Flag indicating if the frame is loaded
        load_attempted: Flag indicating if loading was attempted (for error detection)
    """

    file_path: Path
    frame: F | None = None
    is_loaded: bool = False
    load_attempted: bool = False

    def ensure_loaded(self, loader: Callable[[Path], F | None]) -> F | None:
        """
        Ensures the frame is loaded, loading it if necessary.

        Args:
            loader: Function to load a frame from a file path

        Returns:
            The loaded frame, or None if loading failed
        """
        # Return the current frame if already loaded
        if self.is_loaded:
            return self.frame

        # Attempt to load if not loaded yet
        try:
            self.load_attempted = True
            self.frame = loader(self.file_path)
            self.is_loaded = True
            return self.frame
        except Exception as e:
            logger.error(f"Failed to load file {self.file_path}: {str(e)}")
            self.is_loaded = True  # Loading was attempted
            self.frame = None
            return None

    def reset(self) -> None:
        """
        Reset the frame state.
        """
        self.frame = None
        self.is_loaded = False
        self.load_attempted = False
Attributes
file_path instance-attribute
frame = None class-attribute instance-attribute
is_loaded = False class-attribute instance-attribute
load_attempted = False class-attribute instance-attribute
Functions
__init__(file_path, frame=None, is_loaded=False, load_attempted=False)
ensure_loaded(loader)

Ensures the frame is loaded, loading it if necessary.

Parameters:

Name Type Description Default
loader Callable[[Path], F | None]

Function to load a frame from a file path

required

Returns:

Type Description
F | None

The loaded frame, or None if loading failed

Source code in wandas/utils/frame_dataset.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def ensure_loaded(self, loader: Callable[[Path], F | None]) -> F | None:
    """
    Ensures the frame is loaded, loading it if necessary.

    Args:
        loader: Function to load a frame from a file path

    Returns:
        The loaded frame, or None if loading failed
    """
    # Return the current frame if already loaded
    if self.is_loaded:
        return self.frame

    # Attempt to load if not loaded yet
    try:
        self.load_attempted = True
        self.frame = loader(self.file_path)
        self.is_loaded = True
        return self.frame
    except Exception as e:
        logger.error(f"Failed to load file {self.file_path}: {str(e)}")
        self.is_loaded = True  # Loading was attempted
        self.frame = None
        return None
reset()

Reset the frame state.

Source code in wandas/utils/frame_dataset.py
65
66
67
68
69
70
71
def reset(self) -> None:
    """
    Reset the frame state.
    """
    self.frame = None
    self.is_loaded = False
    self.load_attempted = False

FrameDataset

Bases: Generic[F], ABC

Abstract base dataset class for processing files in a folder. Includes lazy loading capability to efficiently handle large datasets. Subclasses handle specific frame types (ChannelFrame, SpectrogramFrame, etc.).

Source code in wandas/utils/frame_dataset.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class FrameDataset(Generic[F], ABC):
    """
    Abstract base dataset class for processing files in a folder.
    Includes lazy loading capability to efficiently handle large datasets.
    Subclasses handle specific frame types (ChannelFrame, SpectrogramFrame, etc.).
    """

    def __init__(
        self,
        folder_path: str,
        sampling_rate: int | None = None,
        signal_length: int | None = None,
        file_extensions: list[str] | None = None,
        lazy_loading: bool = True,
        recursive: bool = False,
        source_dataset: Optional["FrameDataset[Any]"] = None,
        transform: Callable[[Any], F | None] | None = None,
    ):
        self.folder_path = Path(folder_path)
        if source_dataset is None and not self.folder_path.exists():
            raise FileNotFoundError(f"Folder does not exist: {self.folder_path}")

        self.sampling_rate = sampling_rate
        self.signal_length = signal_length
        self.file_extensions = file_extensions or [".wav"]
        self._recursive = recursive
        self._lazy_loading = lazy_loading

        # Changed to a list of LazyFrame
        self._lazy_frames: list[LazyFrame[F]] = []

        self._source_dataset = source_dataset
        self._transform = transform

        if self._source_dataset:
            self._initialize_from_source()
        else:
            self._initialize_from_folder()

    def _initialize_from_source(self) -> None:
        """Initialize from a source dataset."""
        if self._source_dataset is None:
            return

        # Copy file paths from source
        file_paths = self._source_dataset._get_file_paths()
        self._lazy_frames = [LazyFrame(file_path) for file_path in file_paths]

        # Inherit other properties
        self.sampling_rate = self.sampling_rate or self._source_dataset.sampling_rate
        self.signal_length = self.signal_length or self._source_dataset.signal_length
        self.file_extensions = self.file_extensions or self._source_dataset.file_extensions
        self._recursive = self._source_dataset._recursive
        self.folder_path = self._source_dataset.folder_path

    def _initialize_from_folder(self) -> None:
        """Initialize from a folder."""
        self._discover_files()
        if not self._lazy_loading:
            self._load_all_files()

    def _discover_files(self) -> None:
        """Discover files in the folder and store them in a list of LazyFrame."""
        file_paths = []
        for ext in self.file_extensions:
            pattern = f"**/*{ext}" if self._recursive else f"*{ext}"
            file_paths.extend(sorted(p for p in self.folder_path.glob(pattern) if p.is_file()))

        # Remove duplicates and sort
        file_paths = sorted(list(set(file_paths)))

        # Create a list of LazyFrame
        self._lazy_frames = [LazyFrame(file_path) for file_path in file_paths]

    def _load_all_files(self) -> None:
        """Load all files."""
        for i in tqdm(range(len(self._lazy_frames)), desc="Loading/transforming"):
            try:
                self._ensure_loaded(i)
            except Exception as e:
                filepath = self._lazy_frames[i].file_path
                logger.warning(f"Failed to load/transform index {i} ({filepath}): {str(e)}")
        self._lazy_loading = False

    @abstractmethod
    def _load_file(self, file_path: Path) -> F | None:
        """Abstract method to load a frame from a file."""
        pass

    def _load_from_source(self, index: int) -> F | None:
        """Load a frame from the source dataset and transform it if necessary."""
        if self._source_dataset is None or self._transform is None:
            return None

        source_frame = self._source_dataset._ensure_loaded(index)
        if source_frame is None:
            return None

        try:
            return self._transform(source_frame)
        except Exception as e:
            msg = f"Failed to transform index {index}: {str(e)}"
            logger.warning(msg)
            # Also emit to the root logger to improve capture reliability
            # in test runners and across different logging configurations
            logging.getLogger().warning(msg)
            return None

    def _ensure_loaded(self, index: int) -> F | None:
        """Ensure the frame at the given index is loaded."""
        if not (0 <= index < len(self._lazy_frames)):
            raise IndexError(f"Index {index} is out of range (0-{len(self._lazy_frames) - 1})")

        lazy_frame = self._lazy_frames[index]

        # Return if already loaded
        if lazy_frame.is_loaded:
            return lazy_frame.frame

        try:
            # Convert from source dataset
            if self._transform and self._source_dataset:
                lazy_frame.load_attempted = True
                frame = self._load_from_source(index)
                lazy_frame.frame = frame
                lazy_frame.is_loaded = True
                return frame
            # Load directly from file
            else:
                return lazy_frame.ensure_loaded(self._load_file)
        except Exception as e:
            f_path = lazy_frame.file_path
            logger.error(f"Failed to load or initialize index {index} ({f_path}): {str(e)}")
            lazy_frame.frame = None
            lazy_frame.is_loaded = True
            lazy_frame.load_attempted = True
            return None

    def _get_file_paths(self) -> list[Path]:
        """Get a list of file paths."""
        return [lazy_frame.file_path for lazy_frame in self._lazy_frames]

    def __len__(self) -> int:
        """Return the number of files in the dataset."""
        return len(self._lazy_frames)

    def get_by_label(self, label: str) -> F | None:
        """
        Get a frame by its label (filename).

        Parameters
        ----------
        label : str
            The filename (label) to search for (e.g., 'sample_1.wav').

        Returns
        -------
        Optional[F]
            The frame if found, otherwise None.

        Examples
        --------
        >>> frame = dataset.get_by_label("sample_1.wav")
        >>> if frame:
        ...     print(frame.label)
        """
        # Keep for backward compatibility: return the first match but emit
        # a DeprecationWarning recommending `get_all_by_label`.
        all_matches = self.get_all_by_label(label)
        if len(all_matches) > 0:
            warnings.warn(
                "get_by_label() returns the first matching frame and is deprecated; "
                "use get_all_by_label() to obtain all matches.",
                DeprecationWarning,
                stacklevel=2,
            )
            return all_matches[0]
        return None

    def get_all_by_label(self, label: str) -> list[F]:
        """
        Get all frames matching the given label (filename).

        Parameters
        ----------
        label : str
            The filename (label) to search for (e.g., 'sample_1.wav').

        Returns
        -------
        list[F]
            A list of frames matching the label.
            If none are found, returns an empty list.

        Notes
        -----
        - Search is performed against the filename portion only (i.e. Path.name).
        - Each matched frame will be loaded (triggering lazy load) via `_ensure_loaded`.
        """
        matches: list[F] = []
        for i, lazy_frame in enumerate(self._lazy_frames):
            if lazy_frame.file_path.name == label:
                loaded = self._ensure_loaded(i)
                if loaded is not None:
                    matches.append(loaded)
        return matches

    @overload
    def __getitem__(self, key: int) -> F | None: ...

    @overload
    def __getitem__(self, key: str) -> list[F]: ...

    def __getitem__(self, key: int | str) -> F | None | list[F]:
        """
        Get the frame by index (int) or label (str).

        Parameters
        ----------
        key : int or str
            Index (int) or filename/label (str).

        Returns
        -------
        Optional[F] or list[F]
            If `key` is an int, returns the frame or None. If `key` is a str,
            returns a list of matching frames (may be empty).

        Examples
        --------
        >>> frame = dataset[0]  # by index
        >>> frames = dataset["sample_1.wav"]  # list of matches by filename
        """
        if isinstance(key, int):
            return self._ensure_loaded(key)
        if isinstance(key, str):
            # pandas-like behaviour: return all matches for the label as a list
            return self.get_all_by_label(key)
        raise TypeError(f"Invalid key type: {type(key)}. Must be int or str.")

    @overload
    def apply(self, func: Callable[[F], F_out | None]) -> "FrameDataset[F_out]": ...

    @overload
    def apply(self, func: Callable[[F], Any | None]) -> "FrameDataset[Any]": ...

    def apply(self, func: Callable[[F], Any | None]) -> "FrameDataset[Any]":
        """Apply a function to the entire dataset to create a new dataset."""
        new_dataset = type(self)(
            folder_path=str(self.folder_path),
            lazy_loading=True,
            source_dataset=self,
            transform=func,
            sampling_rate=self.sampling_rate,
            signal_length=self.signal_length,
            file_extensions=self.file_extensions,
            recursive=self._recursive,
        )
        return cast("FrameDataset[Any]", new_dataset)

    def save(self, output_folder: str, filename_prefix: str = "") -> None:
        """Save processed frames to files."""
        raise NotImplementedError("The save method is not currently implemented.")

    def sample(
        self,
        n: int | None = None,
        ratio: float | None = None,
        seed: int | None = None,
    ) -> "FrameDataset[F]":
        """Get a sample from the dataset."""
        if seed is not None:
            random.seed(seed)

        total = len(self._lazy_frames)
        if total == 0:
            return type(self)(
                str(self.folder_path),
                sampling_rate=self.sampling_rate,
                signal_length=self.signal_length,
                file_extensions=self.file_extensions,
                lazy_loading=self._lazy_loading,
                recursive=self._recursive,
            )

        # Determine sample size
        if n is None and ratio is None:
            n = max(1, min(10, int(total * 0.1)))
        elif n is None and ratio is not None:
            n = max(1, int(total * ratio))
        elif n is not None:
            n = max(1, n)
        else:
            n = 1

        n = min(n, total)

        # Randomly select indices
        sampled_indices = sorted(random.sample(range(total), n))

        return _SampledFrameDataset(self, sampled_indices)

    def get_metadata(self) -> dict[str, Any]:
        """Get metadata for the dataset."""
        actual_sr: int | float | None = self.sampling_rate
        frame_type_name = "Unknown"

        # Count loaded frames
        loaded_count = sum(1 for lazy_frame in self._lazy_frames if lazy_frame.is_loaded)

        # Get metadata from the first frame (if possible)
        first_frame: F | None = None
        if len(self._lazy_frames) > 0:
            try:
                if self._lazy_frames[0].is_loaded:
                    first_frame = self._lazy_frames[0].frame

                if first_frame:
                    actual_sr = getattr(first_frame, "sampling_rate", self.sampling_rate)
                    frame_type_name = type(first_frame).__name__
            except Exception as e:
                logger.warning(f"Error accessing the first frame during metadata retrieval: {e}")

        return {
            "folder_path": str(self.folder_path),
            "file_count": len(self._lazy_frames),
            "loaded_count": loaded_count,
            "target_sampling_rate": self.sampling_rate,
            "actual_sampling_rate": actual_sr,
            "signal_length": self.signal_length,
            "file_extensions": self.file_extensions,
            "lazy_loading": self._lazy_loading,
            "recursive": self._recursive,
            "frame_type": frame_type_name,
            "has_transform": self._transform is not None,
            "is_sampled": isinstance(self, _SampledFrameDataset),
        }
Attributes
folder_path = Path(folder_path) instance-attribute
sampling_rate = sampling_rate instance-attribute
signal_length = signal_length instance-attribute
file_extensions = file_extensions or ['.wav'] instance-attribute
Functions
__init__(folder_path, sampling_rate=None, signal_length=None, file_extensions=None, lazy_loading=True, recursive=False, source_dataset=None, transform=None)
Source code in wandas/utils/frame_dataset.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def __init__(
    self,
    folder_path: str,
    sampling_rate: int | None = None,
    signal_length: int | None = None,
    file_extensions: list[str] | None = None,
    lazy_loading: bool = True,
    recursive: bool = False,
    source_dataset: Optional["FrameDataset[Any]"] = None,
    transform: Callable[[Any], F | None] | None = None,
):
    self.folder_path = Path(folder_path)
    if source_dataset is None and not self.folder_path.exists():
        raise FileNotFoundError(f"Folder does not exist: {self.folder_path}")

    self.sampling_rate = sampling_rate
    self.signal_length = signal_length
    self.file_extensions = file_extensions or [".wav"]
    self._recursive = recursive
    self._lazy_loading = lazy_loading

    # Changed to a list of LazyFrame
    self._lazy_frames: list[LazyFrame[F]] = []

    self._source_dataset = source_dataset
    self._transform = transform

    if self._source_dataset:
        self._initialize_from_source()
    else:
        self._initialize_from_folder()
__len__()

Return the number of files in the dataset.

Source code in wandas/utils/frame_dataset.py
216
217
218
def __len__(self) -> int:
    """Return the number of files in the dataset."""
    return len(self._lazy_frames)
get_by_label(label)

Get a frame by its label (filename).

Parameters

label : str The filename (label) to search for (e.g., 'sample_1.wav').

Returns

Optional[F] The frame if found, otherwise None.

Examples

frame = dataset.get_by_label("sample_1.wav") if frame: ... print(frame.label)

Source code in wandas/utils/frame_dataset.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def get_by_label(self, label: str) -> F | None:
    """
    Get a frame by its label (filename).

    Parameters
    ----------
    label : str
        The filename (label) to search for (e.g., 'sample_1.wav').

    Returns
    -------
    Optional[F]
        The frame if found, otherwise None.

    Examples
    --------
    >>> frame = dataset.get_by_label("sample_1.wav")
    >>> if frame:
    ...     print(frame.label)
    """
    # Keep for backward compatibility: return the first match but emit
    # a DeprecationWarning recommending `get_all_by_label`.
    all_matches = self.get_all_by_label(label)
    if len(all_matches) > 0:
        warnings.warn(
            "get_by_label() returns the first matching frame and is deprecated; "
            "use get_all_by_label() to obtain all matches.",
            DeprecationWarning,
            stacklevel=2,
        )
        return all_matches[0]
    return None
get_all_by_label(label)

Get all frames matching the given label (filename).

Parameters

label : str The filename (label) to search for (e.g., 'sample_1.wav').

Returns

list[F] A list of frames matching the label. If none are found, returns an empty list.

Notes
  • Search is performed against the filename portion only (i.e. Path.name).
  • Each matched frame will be loaded (triggering lazy load) via _ensure_loaded.
Source code in wandas/utils/frame_dataset.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def get_all_by_label(self, label: str) -> list[F]:
    """
    Get all frames matching the given label (filename).

    Parameters
    ----------
    label : str
        The filename (label) to search for (e.g., 'sample_1.wav').

    Returns
    -------
    list[F]
        A list of frames matching the label.
        If none are found, returns an empty list.

    Notes
    -----
    - Search is performed against the filename portion only (i.e. Path.name).
    - Each matched frame will be loaded (triggering lazy load) via `_ensure_loaded`.
    """
    matches: list[F] = []
    for i, lazy_frame in enumerate(self._lazy_frames):
        if lazy_frame.file_path.name == label:
            loaded = self._ensure_loaded(i)
            if loaded is not None:
                matches.append(loaded)
    return matches
__getitem__(key)
__getitem__(key: int) -> F | None
__getitem__(key: str) -> list[F]

Get the frame by index (int) or label (str).

Parameters

key : int or str Index (int) or filename/label (str).

Returns

Optional[F] or list[F] If key is an int, returns the frame or None. If key is a str, returns a list of matching frames (may be empty).

Examples

frame = dataset[0] # by index frames = dataset["sample_1.wav"] # list of matches by filename

Source code in wandas/utils/frame_dataset.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def __getitem__(self, key: int | str) -> F | None | list[F]:
    """
    Get the frame by index (int) or label (str).

    Parameters
    ----------
    key : int or str
        Index (int) or filename/label (str).

    Returns
    -------
    Optional[F] or list[F]
        If `key` is an int, returns the frame or None. If `key` is a str,
        returns a list of matching frames (may be empty).

    Examples
    --------
    >>> frame = dataset[0]  # by index
    >>> frames = dataset["sample_1.wav"]  # list of matches by filename
    """
    if isinstance(key, int):
        return self._ensure_loaded(key)
    if isinstance(key, str):
        # pandas-like behaviour: return all matches for the label as a list
        return self.get_all_by_label(key)
    raise TypeError(f"Invalid key type: {type(key)}. Must be int or str.")
apply(func)
apply(func: Callable[[F], F_out | None]) -> FrameDataset[F_out]
apply(func: Callable[[F], Any | None]) -> FrameDataset[Any]

Apply a function to the entire dataset to create a new dataset.

Source code in wandas/utils/frame_dataset.py
320
321
322
323
324
325
326
327
328
329
330
331
332
def apply(self, func: Callable[[F], Any | None]) -> "FrameDataset[Any]":
    """Apply a function to the entire dataset to create a new dataset."""
    new_dataset = type(self)(
        folder_path=str(self.folder_path),
        lazy_loading=True,
        source_dataset=self,
        transform=func,
        sampling_rate=self.sampling_rate,
        signal_length=self.signal_length,
        file_extensions=self.file_extensions,
        recursive=self._recursive,
    )
    return cast("FrameDataset[Any]", new_dataset)
save(output_folder, filename_prefix='')

Save processed frames to files.

Source code in wandas/utils/frame_dataset.py
334
335
336
def save(self, output_folder: str, filename_prefix: str = "") -> None:
    """Save processed frames to files."""
    raise NotImplementedError("The save method is not currently implemented.")
sample(n=None, ratio=None, seed=None)

Get a sample from the dataset.

Source code in wandas/utils/frame_dataset.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def sample(
    self,
    n: int | None = None,
    ratio: float | None = None,
    seed: int | None = None,
) -> "FrameDataset[F]":
    """Get a sample from the dataset."""
    if seed is not None:
        random.seed(seed)

    total = len(self._lazy_frames)
    if total == 0:
        return type(self)(
            str(self.folder_path),
            sampling_rate=self.sampling_rate,
            signal_length=self.signal_length,
            file_extensions=self.file_extensions,
            lazy_loading=self._lazy_loading,
            recursive=self._recursive,
        )

    # Determine sample size
    if n is None and ratio is None:
        n = max(1, min(10, int(total * 0.1)))
    elif n is None and ratio is not None:
        n = max(1, int(total * ratio))
    elif n is not None:
        n = max(1, n)
    else:
        n = 1

    n = min(n, total)

    # Randomly select indices
    sampled_indices = sorted(random.sample(range(total), n))

    return _SampledFrameDataset(self, sampled_indices)
get_metadata()

Get metadata for the dataset.

Source code in wandas/utils/frame_dataset.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def get_metadata(self) -> dict[str, Any]:
    """Get metadata for the dataset."""
    actual_sr: int | float | None = self.sampling_rate
    frame_type_name = "Unknown"

    # Count loaded frames
    loaded_count = sum(1 for lazy_frame in self._lazy_frames if lazy_frame.is_loaded)

    # Get metadata from the first frame (if possible)
    first_frame: F | None = None
    if len(self._lazy_frames) > 0:
        try:
            if self._lazy_frames[0].is_loaded:
                first_frame = self._lazy_frames[0].frame

            if first_frame:
                actual_sr = getattr(first_frame, "sampling_rate", self.sampling_rate)
                frame_type_name = type(first_frame).__name__
        except Exception as e:
            logger.warning(f"Error accessing the first frame during metadata retrieval: {e}")

    return {
        "folder_path": str(self.folder_path),
        "file_count": len(self._lazy_frames),
        "loaded_count": loaded_count,
        "target_sampling_rate": self.sampling_rate,
        "actual_sampling_rate": actual_sr,
        "signal_length": self.signal_length,
        "file_extensions": self.file_extensions,
        "lazy_loading": self._lazy_loading,
        "recursive": self._recursive,
        "frame_type": frame_type_name,
        "has_transform": self._transform is not None,
        "is_sampled": isinstance(self, _SampledFrameDataset),
    }

ChannelFrameDataset

Bases: FrameDataset[ChannelFrame]

Dataset class for handling audio files as ChannelFrames in a folder.

Source code in wandas/utils/frame_dataset.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
class ChannelFrameDataset(FrameDataset[ChannelFrame]):
    """
    Dataset class for handling audio files as ChannelFrames in a folder.
    """

    def __init__(
        self,
        folder_path: str,
        sampling_rate: int | None = None,
        signal_length: int | None = None,
        file_extensions: list[str] | None = None,
        lazy_loading: bool = True,
        recursive: bool = False,
        source_dataset: Optional["FrameDataset[Any]"] = None,
        transform: Callable[[Any], ChannelFrame | None] | None = None,
    ):
        _file_extensions = file_extensions or [
            ".wav",
            ".mp3",
            ".flac",
            ".csv",
        ]

        super().__init__(
            folder_path=folder_path,
            sampling_rate=sampling_rate,
            signal_length=signal_length,
            file_extensions=_file_extensions,
            lazy_loading=lazy_loading,
            recursive=recursive,
            source_dataset=source_dataset,
            transform=transform,
        )

    def _load_file(self, file_path: Path) -> ChannelFrame | None:
        """Load an audio file and return a ChannelFrame."""
        try:
            frame = ChannelFrame.from_file(file_path)
            if self.sampling_rate and frame.sampling_rate != self.sampling_rate:
                logger.info(
                    f"Resampling file {file_path.name} ({frame.sampling_rate} Hz) to "
                    f"dataset rate ({self.sampling_rate} Hz)."
                )
                frame = frame.resampling(target_sr=self.sampling_rate)
            return frame
        except Exception as e:
            logger.error(f"Failed to load or initialize file {file_path}: {str(e)}")
            return None

    def resample(self, target_sr: int) -> "ChannelFrameDataset":
        """Resample all frames in the dataset."""

        def _resample_func(frame: ChannelFrame) -> ChannelFrame | None:
            if frame is None:
                return None
            try:
                return frame.resampling(target_sr=target_sr)
            except Exception as e:
                logger.warning(f"Resampling error (target_sr={target_sr}): {e}")
                return None

        new_dataset = self.apply(_resample_func)
        return cast(ChannelFrameDataset, new_dataset)

    def trim(self, start: float, end: float) -> "ChannelFrameDataset":
        """Trim all frames in the dataset."""

        def _trim_func(frame: ChannelFrame) -> ChannelFrame | None:
            if frame is None:
                return None
            try:
                return frame.trim(start=start, end=end)
            except Exception as e:
                logger.warning(f"Trimming error (start={start}, end={end}): {e}")
                return None

        new_dataset = self.apply(_trim_func)
        return cast(ChannelFrameDataset, new_dataset)

    def normalize(self, **kwargs: Any) -> "ChannelFrameDataset":
        """Normalize all frames in the dataset."""

        def _normalize_func(frame: ChannelFrame) -> ChannelFrame | None:
            if frame is None:
                return None
            try:
                return frame.normalize(**kwargs)
            except Exception as e:
                logger.warning(f"Normalization error ({kwargs}): {e}")
                return None

        new_dataset = self.apply(_normalize_func)
        return cast(ChannelFrameDataset, new_dataset)

    def stft(
        self,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
    ) -> "SpectrogramFrameDataset":
        """Apply STFT to all frames in the dataset."""
        _hop = hop_length or n_fft // 4

        def _stft_func(frame: ChannelFrame) -> SpectrogramFrame | None:
            if frame is None:
                return None
            try:
                return frame.stft(
                    n_fft=n_fft,
                    hop_length=_hop,
                    win_length=win_length,
                    window=window,
                )
            except Exception as e:
                logger.warning(f"STFT error (n_fft={n_fft}, hop={_hop}): {e}")
                return None

        new_dataset = SpectrogramFrameDataset(
            folder_path=str(self.folder_path),
            lazy_loading=True,
            source_dataset=self,
            transform=_stft_func,
            sampling_rate=self.sampling_rate,
        )
        return new_dataset

    @classmethod
    def from_folder(
        cls,
        folder_path: str,
        sampling_rate: int | None = None,
        file_extensions: list[str] | None = None,
        recursive: bool = False,
        lazy_loading: bool = True,
    ) -> "ChannelFrameDataset":
        """Class method to create a ChannelFrameDataset from a folder."""
        extensions = file_extensions if file_extensions is not None else [".wav", ".mp3", ".flac", ".csv"]

        return cls(
            folder_path,
            sampling_rate=sampling_rate,
            file_extensions=extensions,
            lazy_loading=lazy_loading,
            recursive=recursive,
        )
Functions
__init__(folder_path, sampling_rate=None, signal_length=None, file_extensions=None, lazy_loading=True, recursive=False, source_dataset=None, transform=None)
Source code in wandas/utils/frame_dataset.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def __init__(
    self,
    folder_path: str,
    sampling_rate: int | None = None,
    signal_length: int | None = None,
    file_extensions: list[str] | None = None,
    lazy_loading: bool = True,
    recursive: bool = False,
    source_dataset: Optional["FrameDataset[Any]"] = None,
    transform: Callable[[Any], ChannelFrame | None] | None = None,
):
    _file_extensions = file_extensions or [
        ".wav",
        ".mp3",
        ".flac",
        ".csv",
    ]

    super().__init__(
        folder_path=folder_path,
        sampling_rate=sampling_rate,
        signal_length=signal_length,
        file_extensions=_file_extensions,
        lazy_loading=lazy_loading,
        recursive=recursive,
        source_dataset=source_dataset,
        transform=transform,
    )
resample(target_sr)

Resample all frames in the dataset.

Source code in wandas/utils/frame_dataset.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def resample(self, target_sr: int) -> "ChannelFrameDataset":
    """Resample all frames in the dataset."""

    def _resample_func(frame: ChannelFrame) -> ChannelFrame | None:
        if frame is None:
            return None
        try:
            return frame.resampling(target_sr=target_sr)
        except Exception as e:
            logger.warning(f"Resampling error (target_sr={target_sr}): {e}")
            return None

    new_dataset = self.apply(_resample_func)
    return cast(ChannelFrameDataset, new_dataset)
trim(start, end)

Trim all frames in the dataset.

Source code in wandas/utils/frame_dataset.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def trim(self, start: float, end: float) -> "ChannelFrameDataset":
    """Trim all frames in the dataset."""

    def _trim_func(frame: ChannelFrame) -> ChannelFrame | None:
        if frame is None:
            return None
        try:
            return frame.trim(start=start, end=end)
        except Exception as e:
            logger.warning(f"Trimming error (start={start}, end={end}): {e}")
            return None

    new_dataset = self.apply(_trim_func)
    return cast(ChannelFrameDataset, new_dataset)
normalize(**kwargs)

Normalize all frames in the dataset.

Source code in wandas/utils/frame_dataset.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def normalize(self, **kwargs: Any) -> "ChannelFrameDataset":
    """Normalize all frames in the dataset."""

    def _normalize_func(frame: ChannelFrame) -> ChannelFrame | None:
        if frame is None:
            return None
        try:
            return frame.normalize(**kwargs)
        except Exception as e:
            logger.warning(f"Normalization error ({kwargs}): {e}")
            return None

    new_dataset = self.apply(_normalize_func)
    return cast(ChannelFrameDataset, new_dataset)
stft(n_fft=2048, hop_length=None, win_length=None, window='hann')

Apply STFT to all frames in the dataset.

Source code in wandas/utils/frame_dataset.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def stft(
    self,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
) -> "SpectrogramFrameDataset":
    """Apply STFT to all frames in the dataset."""
    _hop = hop_length or n_fft // 4

    def _stft_func(frame: ChannelFrame) -> SpectrogramFrame | None:
        if frame is None:
            return None
        try:
            return frame.stft(
                n_fft=n_fft,
                hop_length=_hop,
                win_length=win_length,
                window=window,
            )
        except Exception as e:
            logger.warning(f"STFT error (n_fft={n_fft}, hop={_hop}): {e}")
            return None

    new_dataset = SpectrogramFrameDataset(
        folder_path=str(self.folder_path),
        lazy_loading=True,
        source_dataset=self,
        transform=_stft_func,
        sampling_rate=self.sampling_rate,
    )
    return new_dataset
from_folder(folder_path, sampling_rate=None, file_extensions=None, recursive=False, lazy_loading=True) classmethod

Class method to create a ChannelFrameDataset from a folder.

Source code in wandas/utils/frame_dataset.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
@classmethod
def from_folder(
    cls,
    folder_path: str,
    sampling_rate: int | None = None,
    file_extensions: list[str] | None = None,
    recursive: bool = False,
    lazy_loading: bool = True,
) -> "ChannelFrameDataset":
    """Class method to create a ChannelFrameDataset from a folder."""
    extensions = file_extensions if file_extensions is not None else [".wav", ".mp3", ".flac", ".csv"]

    return cls(
        folder_path,
        sampling_rate=sampling_rate,
        file_extensions=extensions,
        lazy_loading=lazy_loading,
        recursive=recursive,
    )

SpectrogramFrameDataset

Bases: FrameDataset[SpectrogramFrame]

Dataset class for handling spectrogram data as SpectrogramFrames. Expected to be generated mainly as a result of ChannelFrameDataset.stft().

Source code in wandas/utils/frame_dataset.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
class SpectrogramFrameDataset(FrameDataset[SpectrogramFrame]):
    """
    Dataset class for handling spectrogram data as SpectrogramFrames.
    Expected to be generated mainly as a result of ChannelFrameDataset.stft().
    """

    def __init__(
        self,
        folder_path: str,
        sampling_rate: int | None = None,
        signal_length: int | None = None,
        file_extensions: list[str] | None = None,
        lazy_loading: bool = True,
        recursive: bool = False,
        source_dataset: Optional["FrameDataset[Any]"] = None,
        transform: Callable[[Any], SpectrogramFrame | None] | None = None,
    ):
        super().__init__(
            folder_path=folder_path,
            sampling_rate=sampling_rate,
            signal_length=signal_length,
            file_extensions=file_extensions,
            lazy_loading=lazy_loading,
            recursive=recursive,
            source_dataset=source_dataset,
            transform=transform,
        )

    def _load_file(self, file_path: Path) -> SpectrogramFrame | None:
        """Direct loading from files is not currently supported."""
        logger.warning(
            "No method defined for directly loading SpectrogramFrames. Normally "
            "created from ChannelFrameDataset.stft()."
        )
        raise NotImplementedError("No method defined for directly loading SpectrogramFrames")

    def plot(self, index: int, **kwargs: Any) -> None:
        """Plot the spectrogram at the specified index."""
        try:
            frame = self._ensure_loaded(index)

            if frame is None:
                logger.warning(f"Cannot plot index {index} as it failed to load/transform.")
                return

            plot_method = getattr(frame, "plot", None)
            if callable(plot_method):
                plot_method(**kwargs)
            else:
                logger.warning(
                    f"Frame (index {index}, type {type(frame).__name__}) does not have a plot method implemented."
                )
        except Exception as e:
            logger.error(f"An error occurred while plotting index {index}: {e}")
Functions
__init__(folder_path, sampling_rate=None, signal_length=None, file_extensions=None, lazy_loading=True, recursive=False, source_dataset=None, transform=None)
Source code in wandas/utils/frame_dataset.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def __init__(
    self,
    folder_path: str,
    sampling_rate: int | None = None,
    signal_length: int | None = None,
    file_extensions: list[str] | None = None,
    lazy_loading: bool = True,
    recursive: bool = False,
    source_dataset: Optional["FrameDataset[Any]"] = None,
    transform: Callable[[Any], SpectrogramFrame | None] | None = None,
):
    super().__init__(
        folder_path=folder_path,
        sampling_rate=sampling_rate,
        signal_length=signal_length,
        file_extensions=file_extensions,
        lazy_loading=lazy_loading,
        recursive=recursive,
        source_dataset=source_dataset,
        transform=transform,
    )
plot(index, **kwargs)

Plot the spectrogram at the specified index.

Source code in wandas/utils/frame_dataset.py
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
def plot(self, index: int, **kwargs: Any) -> None:
    """Plot the spectrogram at the specified index."""
    try:
        frame = self._ensure_loaded(index)

        if frame is None:
            logger.warning(f"Cannot plot index {index} as it failed to load/transform.")
            return

        plot_method = getattr(frame, "plot", None)
        if callable(plot_method):
            plot_method(**kwargs)
        else:
            logger.warning(
                f"Frame (index {index}, type {type(frame).__name__}) does not have a plot method implemented."
            )
    except Exception as e:
        logger.error(f"An error occurred while plotting index {index}: {e}")

Sample Generation / サンプル生成

Provides functions for generating sample data for testing. テスト用のサンプルデータを生成する機能を提供します。

wandas.utils.generate_sample

Classes

Functions

generate_sin(freqs=1000, sampling_rate=16000, duration=1.0, label=None)

Generate sample sine wave signals.

Parameters

freqs : float or list of float, default=1000 Frequency of the sine wave(s) in Hz. If multiple frequencies are specified, multiple channels will be created. sampling_rate : int, default=16000 Sampling rate in Hz. duration : float, default=1.0 Duration of the signal in seconds. label : str, optional Label for the entire signal.

Returns

ChannelFrame ChannelFrame object containing the sine wave(s).

Source code in wandas/utils/generate_sample.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def generate_sin(
    freqs: float | list[float] = 1000,
    sampling_rate: int = 16000,
    duration: float = 1.0,
    label: str | None = None,
) -> "ChannelFrame":
    """
    Generate sample sine wave signals.

    Parameters
    ----------
    freqs : float or list of float, default=1000
        Frequency of the sine wave(s) in Hz.
        If multiple frequencies are specified, multiple channels will be created.
    sampling_rate : int, default=16000
        Sampling rate in Hz.
    duration : float, default=1.0
        Duration of the signal in seconds.
    label : str, optional
        Label for the entire signal.

    Returns
    -------
    ChannelFrame
        ChannelFrame object containing the sine wave(s).
    """
    # 直接、generate_sin_lazy関数を呼び出す
    return generate_sin_lazy(freqs=freqs, sampling_rate=sampling_rate, duration=duration, label=label)

generate_sin_lazy(freqs=1000, sampling_rate=16000, duration=1.0, label=None)

Generate sample sine wave signals using lazy computation.

Parameters

freqs : float or list of float, default=1000 Frequency of the sine wave(s) in Hz. If multiple frequencies are specified, multiple channels will be created. sampling_rate : int, default=16000 Sampling rate in Hz. duration : float, default=1.0 Duration of the signal in seconds. label : str, optional Label for the entire signal.

Returns

ChannelFrame Lazy ChannelFrame object containing the sine wave(s).

Source code in wandas/utils/generate_sample.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def generate_sin_lazy(
    freqs: float | list[float] = 1000,
    sampling_rate: int = 16000,
    duration: float = 1.0,
    label: str | None = None,
) -> "ChannelFrame":
    """
    Generate sample sine wave signals using lazy computation.

    Parameters
    ----------
    freqs : float or list of float, default=1000
        Frequency of the sine wave(s) in Hz.
        If multiple frequencies are specified, multiple channels will be created.
    sampling_rate : int, default=16000
        Sampling rate in Hz.
    duration : float, default=1.0
        Duration of the signal in seconds.
    label : str, optional
        Label for the entire signal.

    Returns
    -------
    ChannelFrame
        Lazy ChannelFrame object containing the sine wave(s).
    """
    from wandas.frames.channel import ChannelFrame

    label = label or "Generated Sin"
    t = np.linspace(0, duration, int(sampling_rate * duration), endpoint=False)

    _freqs: list[float]
    if isinstance(freqs, float):
        _freqs = [freqs]
    elif isinstance(freqs, list):
        _freqs = freqs
    else:
        raise ValueError("freqs must be a float or a list of floats.")

    channels = []
    labels = []
    for idx, freq in enumerate(_freqs):
        data = np.sin(2 * np.pi * freq * t)
        labels.append(f"Channel {idx + 1}")
        channels.append(data)
    return ChannelFrame.from_numpy(
        data=np.array(channels),
        label=label,
        sampling_rate=sampling_rate,
        ch_labels=labels,
    )

Type Definitions / 型定義

Provides type definitions used in Wandas. Wandasで使用される型定義を提供します。

wandas.utils.types

Attributes

Real = np.number[Any] module-attribute

Complex = np.complexfloating[Any, Any] module-attribute

NDArrayReal = npt.NDArray[Real] module-attribute

NDArrayComplex = npt.NDArray[Complex] module-attribute

General Utilities / 一般ユーティリティ

Provides other general utility functions. その他の一般的なユーティリティ機能を提供します。

wandas.utils.util

Attributes

Functions

validate_sampling_rate(sampling_rate, param_name='sampling_rate')

Validate that sampling rate is positive.

Parameters

sampling_rate : float Sampling rate in Hz to validate. param_name : str, default="sampling_rate" Name of the parameter being validated (for error messages).

Raises

ValueError If sampling_rate is not positive (i.e., <= 0).

Examples

validate_sampling_rate(44100) # No error validate_sampling_rate(0) # Raises ValueError validate_sampling_rate(-100) # Raises ValueError

Source code in wandas/utils/util.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def validate_sampling_rate(sampling_rate: float, param_name: str = "sampling_rate") -> None:
    """
    Validate that sampling rate is positive.

    Parameters
    ----------
    sampling_rate : float
        Sampling rate in Hz to validate.
    param_name : str, default="sampling_rate"
        Name of the parameter being validated (for error messages).

    Raises
    ------
    ValueError
        If sampling_rate is not positive (i.e., <= 0).

    Examples
    --------
    >>> validate_sampling_rate(44100)  # No error
    >>> validate_sampling_rate(0)  # Raises ValueError
    >>> validate_sampling_rate(-100)  # Raises ValueError
    """
    if sampling_rate <= 0:
        raise ValueError(
            f"Invalid {param_name}\n"
            f"  Got: {sampling_rate} Hz\n"
            f"  Expected: Positive value > 0\n"
            f"Sampling rate represents samples per second and must be positive.\n"
            f"Common values: 8000, 16000, 22050, 44100, 48000 Hz"
        )

unit_to_ref(unit)

Convert unit to reference value.

Parameters

unit : str Unit string.

Returns

float Reference value for the unit. For 'Pa', returns 2e-5 (20 μPa). For other units, returns 1.0.

Source code in wandas/utils/util.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def unit_to_ref(unit: str) -> float:
    """
    Convert unit to reference value.

    Parameters
    ----------
    unit : str
        Unit string.

    Returns
    -------
    float
        Reference value for the unit. For 'Pa', returns 2e-5 (20 μPa).
        For other units, returns 1.0.
    """
    if unit == "Pa":
        return 2e-5

    else:
        return 1.0

calculate_rms(wave)

Calculate the root mean square of the wave.

Parameters

wave : NDArrayReal Input waveform data. Can be multi-channel (shape: [channels, samples]) or single channel (shape: [samples]).

Returns

Union[float, NDArray[np.float64]] RMS value(s). For multi-channel input, returns an array of RMS values, one per channel. For single-channel input, returns a single RMS value.

Source code in wandas/utils/util.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def calculate_rms(wave: "NDArrayReal") -> "NDArrayReal":
    """
    Calculate the root mean square of the wave.

    Parameters
    ----------
    wave : NDArrayReal
        Input waveform data. Can be multi-channel (shape: [channels, samples])
        or single channel (shape: [samples]).

    Returns
    -------
    Union[float, NDArray[np.float64]]
        RMS value(s). For multi-channel input, returns an array of RMS values,
        one per channel. For single-channel input, returns a single RMS value.
    """
    # Calculate RMS considering axis (over the last dimension)
    axis_to_use = -1 if wave.ndim > 1 else None
    rms_values: NDArrayReal = np.sqrt(np.mean(np.square(wave), axis=axis_to_use, keepdims=True))
    return rms_values

calculate_desired_noise_rms(clean_rms, snr)

Calculate the desired noise RMS based on clean signal RMS and target SNR.

Parameters

clean_rms : "NDArrayReal" RMS value(s) of the clean signal. Can be a single value or an array for multi-channel. snr : float Target Signal-to-Noise Ratio in dB.

Returns

"NDArrayReal" Desired noise RMS value(s) to achieve the target SNR.

Source code in wandas/utils/util.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def calculate_desired_noise_rms(clean_rms: "NDArrayReal", snr: float) -> "NDArrayReal":
    """
    Calculate the desired noise RMS based on clean signal RMS and target SNR.

    Parameters
    ----------
    clean_rms : "NDArrayReal"
        RMS value(s) of the clean signal.
        Can be a single value or an array for multi-channel.
    snr : float
        Target Signal-to-Noise Ratio in dB.

    Returns
    -------
    "NDArrayReal"
        Desired noise RMS value(s) to achieve the target SNR.
    """
    a = snr / 20
    noise_rms = clean_rms / (10**a)
    return noise_rms

amplitude_to_db(amplitude, ref)

Convert amplitude to decibel.

Parameters

amplitude : NDArrayReal Input amplitude data. ref : float Reference value for conversion.

Returns

NDArrayReal Amplitude data converted to decibels.

Source code in wandas/utils/util.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def amplitude_to_db(amplitude: "NDArrayReal", ref: float) -> "NDArrayReal":
    """
    Convert amplitude to decibel.

    Parameters
    ----------
    amplitude : NDArrayReal
        Input amplitude data.
    ref : float
        Reference value for conversion.

    Returns
    -------
    NDArrayReal
        Amplitude data converted to decibels.
    """
    db: NDArrayReal = librosa.amplitude_to_db(np.abs(amplitude), ref=ref, amin=1e-15, top_db=None)
    return db

level_trigger(data, level, offset=0, hold=1)

Find points where the signal crosses the specified level from below.

Parameters

data : NDArrayReal Input signal data. level : float Threshold level for triggering. offset : int, default=0 Offset to add to trigger points. hold : int, default=1 Minimum number of samples between successive trigger points.

Returns

list of int List of sample indices where the signal crosses the level.

Source code in wandas/utils/util.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def level_trigger(data: "NDArrayReal", level: float, offset: int = 0, hold: int = 1) -> list[int]:
    """
    Find points where the signal crosses the specified level from below.

    Parameters
    ----------
    data : NDArrayReal
        Input signal data.
    level : float
        Threshold level for triggering.
    offset : int, default=0
        Offset to add to trigger points.
    hold : int, default=1
        Minimum number of samples between successive trigger points.

    Returns
    -------
    list of int
        List of sample indices where the signal crosses the level.
    """
    trig_point: list[int] = []

    sig_len = len(data)
    diff = np.diff(np.sign(data - level))
    level_point = np.where(diff > 0)[0]
    level_point = level_point[(level_point + hold) < sig_len]

    if len(level_point) == 0:
        return list()

    last_point = level_point[0]
    trig_point.append(last_point + offset)
    for i in level_point:
        if (last_point + hold) < i:
            trig_point.append(i + offset)
            last_point = i

    return trig_point

cut_sig(data, point_list, cut_len, taper_rate=0, dc_cut=False)

Cut segments from signal at specified points.

Parameters

data : NDArrayReal Input signal data. point_list : list of int List of starting points for cutting. cut_len : int Length of each segment to cut. taper_rate : float, default=0 Taper rate for Tukey window applied to segments. A value of 0 means no tapering, 1 means full tapering. dc_cut : bool, default=False Whether to remove DC component (mean) from segments.

Returns

NDArrayReal Array containing cut segments with shape (n_segments, cut_len).

Source code in wandas/utils/util.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def cut_sig(
    data: "NDArrayReal",
    point_list: list[int],
    cut_len: int,
    taper_rate: float = 0,
    dc_cut: bool = False,
) -> "NDArrayReal":
    """
    Cut segments from signal at specified points.

    Parameters
    ----------
    data : NDArrayReal
        Input signal data.
    point_list : list of int
        List of starting points for cutting.
    cut_len : int
        Length of each segment to cut.
    taper_rate : float, default=0
        Taper rate for Tukey window applied to segments.
        A value of 0 means no tapering, 1 means full tapering.
    dc_cut : bool, default=False
        Whether to remove DC component (mean) from segments.

    Returns
    -------
    NDArrayReal
        Array containing cut segments with shape (n_segments, cut_len).
    """
    length = len(data)
    point_list_ = [p for p in point_list if p >= 0 and p + cut_len <= length]
    trial: NDArrayReal = np.zeros((len(point_list_), cut_len))

    for i, v in enumerate(point_list_):
        trial[i] = data[v : v + cut_len]
        if dc_cut:
            trial[i] = trial[i] - trial[i].mean()

    win: NDArrayReal = tukey(cut_len, taper_rate).astype(trial.dtype)[np.newaxis, :]
    trial = trial * win
    return trial