Skip to content

Processing Module / 処理モジュール

The wandas.processing module provides various processing capabilities for audio data. wandas.processing モジュールは、オーディオデータに対する様々な処理機能を提供します。

Base Processing / 基本処理

Provides basic processing operations. 基本的な処理操作を提供します。

wandas.processing.base

Attributes

logger = logging.getLogger(__name__) module-attribute

InputArrayType = TypeVar('InputArrayType', NDArrayReal, NDArrayComplex) module-attribute

OutputArrayType = TypeVar('OutputArrayType', NDArrayReal, NDArrayComplex) module-attribute

Classes

AudioOperation

Bases: Generic[InputArrayType, OutputArrayType]

Abstract base class for audio processing operations.

Source code in wandas/processing/base.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
class AudioOperation(Generic[InputArrayType, OutputArrayType]):
    """Abstract base class for audio processing operations."""

    # Class variable: operation name
    name: ClassVar[str]

    def __init__(self, sampling_rate: float, *, pure: bool = True, **params: Any):
        """
        Initialize AudioOperation.

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        pure : bool, default=True
            Whether the operation is pure (deterministic with no side effects).
            When True, Dask can cache results for identical inputs.
            Set to False only if the operation has side effects or is non-deterministic.
        **params : Any
            Operation-specific parameters
        """
        self.sampling_rate = sampling_rate
        self.pure = pure
        self.params = params

        # Validate parameters during initialization
        self.validate_params()

        # Create processor function (lazy initialization possible)
        self._setup_processor()

        logger.debug(f"Initialized {self.__class__.__name__} operation with params: {params}")

    def validate_params(self) -> None:
        """Validate parameters (raises exception if invalid)"""
        pass

    def _setup_processor(self) -> None:
        """Set up processor function (implemented by subclasses)"""
        pass

    def get_metadata_updates(self) -> dict[str, Any]:
        """
        Get metadata updates to apply after processing.

        This method allows operations to specify how metadata should be
        updated after processing. By default, no metadata is updated.

        Returns
        -------
        dict
            Dictionary of metadata updates. Can include:
            - 'sampling_rate': New sampling rate (float)
            - Other metadata keys as needed

        Examples
        --------
        Return empty dict for operations that don't change metadata:

        >>> return {}

        Return new sampling rate for operations that resample:

        >>> return {"sampling_rate": self.target_sr}

        Notes
        -----
        This method is called by the framework after processing to update
        the frame metadata. Subclasses should override this method if they
        need to update metadata (e.g., changing sampling rate).

        Design principle: Operations should use parameters provided at
        initialization (via __init__). All necessary information should be
        available as instance variables.
        """
        return {}

    def get_display_name(self) -> str | None:
        """
        Get display name for the operation for use in channel labels.

        This method allows operations to customize how they appear in
        channel labels. By default, returns None, which means the
        operation name will be used.

        Returns
        -------
        str or None
            Display name for the operation. If None, the operation name
            (from the `name` class variable) is used.

        Examples
        --------
        Default behavior (returns None, uses operation name):

        >>> class NormalizeOp(AudioOperation):
        ...     name = "normalize"
        >>> op = NormalizeOp(44100)
        >>> op.get_display_name()  # Returns None
        >>> # Channel label: "normalize(ch0)"

        Custom display name:

        >>> class LowPassFilter(AudioOperation):
        ...     name = "lowpass_filter"
        ...
        ...     def __init__(self, sr, cutoff):
        ...         self.cutoff = cutoff
        ...         super().__init__(sr, cutoff=cutoff)
        ...
        ...     def get_display_name(self):
        ...         return f"lpf_{self.cutoff}Hz"
        >>> op = LowPassFilter(44100, cutoff=1000)
        >>> op.get_display_name()  # Returns "lpf_1000Hz"
        >>> # Channel label: "lpf_1000Hz(ch0)"

        Notes
        -----
        Subclasses can override this method to provide operation-specific
        display names that include parameter information, making labels
        more informative.
        """
        return None

    def _process_array(self, x: InputArrayType) -> OutputArrayType:
        """Processing function (implemented by subclasses)"""
        # Default is no-op function
        raise NotImplementedError("Subclasses must implement this method.")

    def _create_named_wrapper(self) -> Any:
        """
        Create a named wrapper function for better Dask graph visualization.

        Returns
        -------
        callable
            A wrapper function with the operation name set as __name__.
        """

        def operation_wrapper(x: InputArrayType) -> OutputArrayType:
            return self._process_array(x)

        # Set the function name to the operation name for better visualization
        operation_wrapper.__name__ = self.name
        return operation_wrapper

    def process_array(self, x: InputArrayType) -> Any:
        """
        Processing function wrapped with @dask.delayed.

        This method returns a Delayed object that can be computed later.
        The operation name is used in the Dask task graph for better visualization.

        Parameters
        ----------
        x : InputArrayType
            Input array to process.

        Returns
        -------
        dask.delayed.Delayed
            A Delayed object representing the computation.
        """
        logger.debug(f"Creating delayed operation on data with shape: {x.shape}")
        # Create wrapper with operation name and wrap it with dask.delayed
        wrapper = self._create_named_wrapper()
        delayed_func = delayed(wrapper, pure=self.pure)
        return delayed_func(x)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation.

        This method can be overridden by subclasses for efficiency.
        If not overridden, it will execute _process_array on a small test array
        to determine the output shape.

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape

        Notes
        -----
        The default implementation creates a minimal test array and processes it
        to determine output shape. For performance-critical code, subclasses should
        override this method with a direct calculation.
        """
        # Try to infer shape by executing _process_array on test data
        import numpy as np

        try:
            # Create minimal test array with input shape
            if len(input_shape) == 0:
                return input_shape

            # Create test input with correct dtype
            # Try complex first, fall back to float if needed
            test_input: Any = np.zeros(input_shape, dtype=np.complex128)

            # Process test input
            test_output: Any = self._process_array(test_input)

            # Return the shape of the output
            if isinstance(test_output, np.ndarray):
                return tuple(int(s) for s in test_output.shape)
            return input_shape
        except Exception as e:
            logger.warning(
                f"Failed to infer output shape for {self.__class__.__name__}: {e}. "
                "Please implement calculate_output_shape method."
            )
            raise NotImplementedError(
                f"Subclass {self.__class__.__name__} must implement "
                f"calculate_output_shape or ensure _process_array can be "
                f"called with test data."
            ) from e

    def process(self, data: DaArray) -> DaArray:
        """
        Execute operation and return result
        data shape is (channels, samples)
        """
        # Add task as delayed processing with custom name for visualization
        logger.debug("Adding delayed operation to computation graph")

        # Create a wrapper function with the operation name
        # This allows Dask to use the operation name in the task graph
        wrapper = self._create_named_wrapper()
        delayed_func = delayed(wrapper, pure=self.pure)
        delayed_result = delayed_func(data)

        # Convert delayed result to dask array and return
        output_shape = self.calculate_output_shape(data.shape)
        return _da_from_delayed(delayed_result, shape=output_shape, dtype=data.dtype)
Attributes
name class-attribute
sampling_rate = sampling_rate instance-attribute
pure = pure instance-attribute
params = params instance-attribute
Functions
__init__(sampling_rate, *, pure=True, **params)

Initialize AudioOperation.

Parameters

sampling_rate : float Sampling rate (Hz) pure : bool, default=True Whether the operation is pure (deterministic with no side effects). When True, Dask can cache results for identical inputs. Set to False only if the operation has side effects or is non-deterministic. **params : Any Operation-specific parameters

Source code in wandas/processing/base.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, sampling_rate: float, *, pure: bool = True, **params: Any):
    """
    Initialize AudioOperation.

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    pure : bool, default=True
        Whether the operation is pure (deterministic with no side effects).
        When True, Dask can cache results for identical inputs.
        Set to False only if the operation has side effects or is non-deterministic.
    **params : Any
        Operation-specific parameters
    """
    self.sampling_rate = sampling_rate
    self.pure = pure
    self.params = params

    # Validate parameters during initialization
    self.validate_params()

    # Create processor function (lazy initialization possible)
    self._setup_processor()

    logger.debug(f"Initialized {self.__class__.__name__} operation with params: {params}")
validate_params()

Validate parameters (raises exception if invalid)

Source code in wandas/processing/base.py
53
54
55
def validate_params(self) -> None:
    """Validate parameters (raises exception if invalid)"""
    pass
get_metadata_updates()

Get metadata updates to apply after processing.

This method allows operations to specify how metadata should be updated after processing. By default, no metadata is updated.

Returns

dict Dictionary of metadata updates. Can include: - 'sampling_rate': New sampling rate (float) - Other metadata keys as needed

Examples

Return empty dict for operations that don't change metadata:

return {}

Return new sampling rate for operations that resample:

return {"sampling_rate": self.target_sr}

Notes

This method is called by the framework after processing to update the frame metadata. Subclasses should override this method if they need to update metadata (e.g., changing sampling rate).

Design principle: Operations should use parameters provided at initialization (via init). All necessary information should be available as instance variables.

Source code in wandas/processing/base.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_metadata_updates(self) -> dict[str, Any]:
    """
    Get metadata updates to apply after processing.

    This method allows operations to specify how metadata should be
    updated after processing. By default, no metadata is updated.

    Returns
    -------
    dict
        Dictionary of metadata updates. Can include:
        - 'sampling_rate': New sampling rate (float)
        - Other metadata keys as needed

    Examples
    --------
    Return empty dict for operations that don't change metadata:

    >>> return {}

    Return new sampling rate for operations that resample:

    >>> return {"sampling_rate": self.target_sr}

    Notes
    -----
    This method is called by the framework after processing to update
    the frame metadata. Subclasses should override this method if they
    need to update metadata (e.g., changing sampling rate).

    Design principle: Operations should use parameters provided at
    initialization (via __init__). All necessary information should be
    available as instance variables.
    """
    return {}
get_display_name()

Get display name for the operation for use in channel labels.

This method allows operations to customize how they appear in channel labels. By default, returns None, which means the operation name will be used.

Returns

str or None Display name for the operation. If None, the operation name (from the name class variable) is used.

Examples

Default behavior (returns None, uses operation name):

class NormalizeOp(AudioOperation): ... name = "normalize" op = NormalizeOp(44100) op.get_display_name() # Returns None

Channel label: "normalize(ch0)"

Custom display name:

class LowPassFilter(AudioOperation): ... name = "lowpass_filter" ... ... def init(self, sr, cutoff): ... self.cutoff = cutoff ... super().init(sr, cutoff=cutoff) ... ... def get_display_name(self): ... return f"lpf_{self.cutoff}Hz" op = LowPassFilter(44100, cutoff=1000) op.get_display_name() # Returns "lpf_1000Hz"

Channel label: "lpf_1000Hz(ch0)"
Notes

Subclasses can override this method to provide operation-specific display names that include parameter information, making labels more informative.

Source code in wandas/processing/base.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def get_display_name(self) -> str | None:
    """
    Get display name for the operation for use in channel labels.

    This method allows operations to customize how they appear in
    channel labels. By default, returns None, which means the
    operation name will be used.

    Returns
    -------
    str or None
        Display name for the operation. If None, the operation name
        (from the `name` class variable) is used.

    Examples
    --------
    Default behavior (returns None, uses operation name):

    >>> class NormalizeOp(AudioOperation):
    ...     name = "normalize"
    >>> op = NormalizeOp(44100)
    >>> op.get_display_name()  # Returns None
    >>> # Channel label: "normalize(ch0)"

    Custom display name:

    >>> class LowPassFilter(AudioOperation):
    ...     name = "lowpass_filter"
    ...
    ...     def __init__(self, sr, cutoff):
    ...         self.cutoff = cutoff
    ...         super().__init__(sr, cutoff=cutoff)
    ...
    ...     def get_display_name(self):
    ...         return f"lpf_{self.cutoff}Hz"
    >>> op = LowPassFilter(44100, cutoff=1000)
    >>> op.get_display_name()  # Returns "lpf_1000Hz"
    >>> # Channel label: "lpf_1000Hz(ch0)"

    Notes
    -----
    Subclasses can override this method to provide operation-specific
    display names that include parameter information, making labels
    more informative.
    """
    return None
process_array(x)

Processing function wrapped with @dask.delayed.

This method returns a Delayed object that can be computed later. The operation name is used in the Dask task graph for better visualization.

Parameters

x : InputArrayType Input array to process.

Returns

dask.delayed.Delayed A Delayed object representing the computation.

Source code in wandas/processing/base.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def process_array(self, x: InputArrayType) -> Any:
    """
    Processing function wrapped with @dask.delayed.

    This method returns a Delayed object that can be computed later.
    The operation name is used in the Dask task graph for better visualization.

    Parameters
    ----------
    x : InputArrayType
        Input array to process.

    Returns
    -------
    dask.delayed.Delayed
        A Delayed object representing the computation.
    """
    logger.debug(f"Creating delayed operation on data with shape: {x.shape}")
    # Create wrapper with operation name and wrap it with dask.delayed
    wrapper = self._create_named_wrapper()
    delayed_func = delayed(wrapper, pure=self.pure)
    return delayed_func(x)
calculate_output_shape(input_shape)

Calculate output data shape after operation.

This method can be overridden by subclasses for efficiency. If not overridden, it will execute _process_array on a small test array to determine the output shape.

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Notes

The default implementation creates a minimal test array and processes it to determine output shape. For performance-critical code, subclasses should override this method with a direct calculation.

Source code in wandas/processing/base.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation.

    This method can be overridden by subclasses for efficiency.
    If not overridden, it will execute _process_array on a small test array
    to determine the output shape.

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape

    Notes
    -----
    The default implementation creates a minimal test array and processes it
    to determine output shape. For performance-critical code, subclasses should
    override this method with a direct calculation.
    """
    # Try to infer shape by executing _process_array on test data
    import numpy as np

    try:
        # Create minimal test array with input shape
        if len(input_shape) == 0:
            return input_shape

        # Create test input with correct dtype
        # Try complex first, fall back to float if needed
        test_input: Any = np.zeros(input_shape, dtype=np.complex128)

        # Process test input
        test_output: Any = self._process_array(test_input)

        # Return the shape of the output
        if isinstance(test_output, np.ndarray):
            return tuple(int(s) for s in test_output.shape)
        return input_shape
    except Exception as e:
        logger.warning(
            f"Failed to infer output shape for {self.__class__.__name__}: {e}. "
            "Please implement calculate_output_shape method."
        )
        raise NotImplementedError(
            f"Subclass {self.__class__.__name__} must implement "
            f"calculate_output_shape or ensure _process_array can be "
            f"called with test data."
        ) from e
process(data)

Execute operation and return result data shape is (channels, samples)

Source code in wandas/processing/base.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def process(self, data: DaArray) -> DaArray:
    """
    Execute operation and return result
    data shape is (channels, samples)
    """
    # Add task as delayed processing with custom name for visualization
    logger.debug("Adding delayed operation to computation graph")

    # Create a wrapper function with the operation name
    # This allows Dask to use the operation name in the task graph
    wrapper = self._create_named_wrapper()
    delayed_func = delayed(wrapper, pure=self.pure)
    delayed_result = delayed_func(data)

    # Convert delayed result to dask array and return
    output_shape = self.calculate_output_shape(data.shape)
    return _da_from_delayed(delayed_result, shape=output_shape, dtype=data.dtype)

Functions

register_operation(operation_class)

Register a new operation type

Source code in wandas/processing/base.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def register_operation(operation_class: type) -> None:
    """Register a new operation type"""

    if not issubclass(operation_class, AudioOperation):
        raise TypeError("Strategy class must inherit from AudioOperation.")
    if inspect.isabstract(operation_class):
        raise TypeError("Cannot register abstract AudioOperation class.")

    existing = _OPERATION_REGISTRY.get(operation_class.name)
    if (
        existing is not None
        and existing.__module__ == operation_class.__module__
        and existing.__qualname__ == operation_class.__qualname__
    ):
        return

    _OPERATION_REGISTRY[operation_class.name] = operation_class

get_operation(name)

Get operation class by name

Source code in wandas/processing/base.py
285
286
287
288
289
def get_operation(name: str) -> type[AudioOperation[Any, Any]]:
    """Get operation class by name"""
    if name not in _OPERATION_REGISTRY:
        raise ValueError(f"Unknown operation type: {name}")
    return _OPERATION_REGISTRY[name]

create_operation(name, sampling_rate, **params)

Create operation instance from name and parameters

Source code in wandas/processing/base.py
292
293
294
295
def create_operation(name: str, sampling_rate: float, **params: Any) -> AudioOperation[Any, Any]:
    """Create operation instance from name and parameters"""
    operation_class = get_operation(name)
    return operation_class(sampling_rate, **params)

Effects / エフェクト

Provides audio effect processing. オーディオエフェクト処理を提供します。

wandas.processing.effects

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

HpssHarmonic

Bases: AudioOperation[NDArrayReal, NDArrayReal]

HPSS Harmonic operation

Source code in wandas/processing/effects.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class HpssHarmonic(AudioOperation[NDArrayReal, NDArrayReal]):
    """HPSS Harmonic operation"""

    name = "hpss_harmonic"

    def __init__(
        self,
        sampling_rate: float,
        **kwargs: Any,
    ):
        """
        Initialize HPSS Harmonic

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        self.kwargs = kwargs
        super().__init__(sampling_rate, **kwargs)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Hrm"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for HPSS Harmonic"""
        logger.debug(f"Applying HPSS Harmonic to array with shape: {x.shape}")
        result: NDArrayReal = effects.harmonic(x, **self.kwargs)
        logger.debug(f"HPSS Harmonic applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'hpss_harmonic' class-attribute instance-attribute
kwargs = kwargs instance-attribute
Functions
__init__(sampling_rate, **kwargs)

Initialize HPSS Harmonic

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/effects.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(
    self,
    sampling_rate: float,
    **kwargs: Any,
):
    """
    Initialize HPSS Harmonic

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    self.kwargs = kwargs
    super().__init__(sampling_rate, **kwargs)
calculate_output_shape(input_shape)
Source code in wandas/processing/effects.py
38
39
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/effects.py
41
42
43
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Hrm"

HpssPercussive

Bases: AudioOperation[NDArrayReal, NDArrayReal]

HPSS Percussive operation

Source code in wandas/processing/effects.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class HpssPercussive(AudioOperation[NDArrayReal, NDArrayReal]):
    """HPSS Percussive operation"""

    name = "hpss_percussive"

    def __init__(
        self,
        sampling_rate: float,
        **kwargs: Any,
    ):
        """
        Initialize HPSS Percussive

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        self.kwargs = kwargs
        super().__init__(sampling_rate, **kwargs)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Prc"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for HPSS Percussive"""
        logger.debug(f"Applying HPSS Percussive to array with shape: {x.shape}")
        result: NDArrayReal = effects.percussive(x, **self.kwargs)
        logger.debug(f"HPSS Percussive applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'hpss_percussive' class-attribute instance-attribute
kwargs = kwargs instance-attribute
Functions
__init__(sampling_rate, **kwargs)

Initialize HPSS Percussive

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/effects.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    sampling_rate: float,
    **kwargs: Any,
):
    """
    Initialize HPSS Percussive

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    self.kwargs = kwargs
    super().__init__(sampling_rate, **kwargs)
calculate_output_shape(input_shape)
Source code in wandas/processing/effects.py
74
75
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/effects.py
77
78
79
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Prc"

Normalize

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Signal normalization operation using librosa.util.normalize

Source code in wandas/processing/effects.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class Normalize(AudioOperation[NDArrayReal, NDArrayReal]):
    """Signal normalization operation using librosa.util.normalize"""

    name = "normalize"

    def __init__(
        self,
        sampling_rate: float,
        norm: float | None = np.inf,
        axis: int | None = -1,
        threshold: float | None = None,
        fill: bool | None = None,
    ):
        """
        Initialize normalization operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        norm : float or np.inf, default=np.inf
            Norm type. Supported values:
            - np.inf: Maximum absolute value normalization
            - -np.inf: Minimum absolute value normalization
            - 0: Pseudo L0 normalization (divide by number of non-zero elements)
            - float: Lp norm
            - None: No normalization
        axis : int or None, default=-1
            Axis along which to normalize.
            - -1: Normalize along time axis (each channel independently)
            - None: Global normalization across all axes
            - int: Normalize along specified axis
        threshold : float or None, optional
            Threshold below which values are considered zero.
            If None, no threshold is applied.
        fill : bool or None, optional
            Value to fill when the norm is zero.
            If None, the zero vector remains zero.

        Raises
        ------
        ValueError
            If norm parameter is invalid or threshold is negative
        """
        # Validate norm parameter
        if norm is not None and not isinstance(norm, int | float):
            raise ValueError(
                f"Invalid normalization method\n"
                f"  Got: {type(norm).__name__} ({norm})\n"
                f"  Expected: float, int, np.inf, -np.inf, or None\n"
                f"Norm parameter must be a numeric value or None.\n"
                f"Common values: np.inf (max norm), 2 (L2 norm),\n"
                f"1 (L1 norm), 0 (pseudo L0)"
            )

        # Validate that norm is non-negative (except for -np.inf which is valid)
        if norm is not None and norm < 0 and not np.isneginf(norm):
            raise ValueError(
                f"Invalid normalization method\n"
                f"  Got: {norm}\n"
                f"  Expected: Non-negative value, np.inf, -np.inf, or None\n"
                f"Norm parameter must be non-negative (except -np.inf for min norm).\n"
                f"Common values: np.inf (max norm), 2 (L2 norm),\n"
                f"1 (L1 norm), 0 (pseudo L0)"
            )

        # Validate threshold
        if threshold is not None and threshold < 0:
            raise ValueError(
                f"Invalid threshold for normalization\n"
                f"  Got: {threshold}\n"
                f"  Expected: Non-negative value or None\n"
                f"Threshold must be non-negative.\n"
                f"Typical values: 0.0 (no threshold), 1e-10 (small threshold)"
            )

        super().__init__(sampling_rate, norm=norm, axis=axis, threshold=threshold, fill=fill)
        self.norm = norm
        self.axis = axis
        self.threshold = threshold
        self.fill = fill
        logger.debug(
            f"Initialized Normalize operation with norm={norm}, axis={axis}, threshold={threshold}, fill={fill}"
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape (same as input)
        """
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "norm"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Perform normalization processing"""
        logger.debug(f"Applying normalization to array with shape: {x.shape}, norm={self.norm}, axis={self.axis}")

        # Apply librosa.util.normalize
        result: NDArrayReal = librosa_util.normalize(
            x, norm=self.norm, axis=self.axis, threshold=self.threshold, fill=self.fill
        )

        logger.debug(f"Normalization applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'normalize' class-attribute instance-attribute
norm = norm instance-attribute
axis = axis instance-attribute
threshold = threshold instance-attribute
fill = fill instance-attribute
Functions
__init__(sampling_rate, norm=np.inf, axis=-1, threshold=None, fill=None)

Initialize normalization operation

Parameters

sampling_rate : float Sampling rate (Hz) norm : float or np.inf, default=np.inf Norm type. Supported values: - np.inf: Maximum absolute value normalization - -np.inf: Minimum absolute value normalization - 0: Pseudo L0 normalization (divide by number of non-zero elements) - float: Lp norm - None: No normalization axis : int or None, default=-1 Axis along which to normalize. - -1: Normalize along time axis (each channel independently) - None: Global normalization across all axes - int: Normalize along specified axis threshold : float or None, optional Threshold below which values are considered zero. If None, no threshold is applied. fill : bool or None, optional Value to fill when the norm is zero. If None, the zero vector remains zero.

Raises

ValueError If norm parameter is invalid or threshold is negative

Source code in wandas/processing/effects.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def __init__(
    self,
    sampling_rate: float,
    norm: float | None = np.inf,
    axis: int | None = -1,
    threshold: float | None = None,
    fill: bool | None = None,
):
    """
    Initialize normalization operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    norm : float or np.inf, default=np.inf
        Norm type. Supported values:
        - np.inf: Maximum absolute value normalization
        - -np.inf: Minimum absolute value normalization
        - 0: Pseudo L0 normalization (divide by number of non-zero elements)
        - float: Lp norm
        - None: No normalization
    axis : int or None, default=-1
        Axis along which to normalize.
        - -1: Normalize along time axis (each channel independently)
        - None: Global normalization across all axes
        - int: Normalize along specified axis
    threshold : float or None, optional
        Threshold below which values are considered zero.
        If None, no threshold is applied.
    fill : bool or None, optional
        Value to fill when the norm is zero.
        If None, the zero vector remains zero.

    Raises
    ------
    ValueError
        If norm parameter is invalid or threshold is negative
    """
    # Validate norm parameter
    if norm is not None and not isinstance(norm, int | float):
        raise ValueError(
            f"Invalid normalization method\n"
            f"  Got: {type(norm).__name__} ({norm})\n"
            f"  Expected: float, int, np.inf, -np.inf, or None\n"
            f"Norm parameter must be a numeric value or None.\n"
            f"Common values: np.inf (max norm), 2 (L2 norm),\n"
            f"1 (L1 norm), 0 (pseudo L0)"
        )

    # Validate that norm is non-negative (except for -np.inf which is valid)
    if norm is not None and norm < 0 and not np.isneginf(norm):
        raise ValueError(
            f"Invalid normalization method\n"
            f"  Got: {norm}\n"
            f"  Expected: Non-negative value, np.inf, -np.inf, or None\n"
            f"Norm parameter must be non-negative (except -np.inf for min norm).\n"
            f"Common values: np.inf (max norm), 2 (L2 norm),\n"
            f"1 (L1 norm), 0 (pseudo L0)"
        )

    # Validate threshold
    if threshold is not None and threshold < 0:
        raise ValueError(
            f"Invalid threshold for normalization\n"
            f"  Got: {threshold}\n"
            f"  Expected: Non-negative value or None\n"
            f"Threshold must be non-negative.\n"
            f"Typical values: 0.0 (no threshold), 1e-10 (small threshold)"
        )

    super().__init__(sampling_rate, norm=norm, axis=axis, threshold=threshold, fill=fill)
    self.norm = norm
    self.axis = axis
    self.threshold = threshold
    self.fill = fill
    logger.debug(
        f"Initialized Normalize operation with norm={norm}, axis={axis}, threshold={threshold}, fill={fill}"
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape (same as input)

Source code in wandas/processing/effects.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape (same as input)
    """
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/effects.py
190
191
192
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "norm"

RemoveDC

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Remove DC component (DC offset) from the signal.

This operation removes the DC component by subtracting the mean value from each channel, centering the signal around zero.

Source code in wandas/processing/effects.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class RemoveDC(AudioOperation[NDArrayReal, NDArrayReal]):
    """Remove DC component (DC offset) from the signal.

    This operation removes the DC component by subtracting the mean value
    from each channel, centering the signal around zero.
    """

    name = "remove_dc"

    def __init__(self, sampling_rate: float):
        """Initialize DC removal operation.

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        super().__init__(sampling_rate)
        logger.debug("Initialized RemoveDC operation")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """Calculate output data shape after operation.

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape (same as input)
        """
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "dcRM"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Perform DC removal processing.

        Parameters
        ----------
        x : NDArrayReal
            Input signal array (channels, samples)

        Returns
        -------
        NDArrayReal
            Signal with DC component removed
        """
        logger.debug(f"Removing DC component from array with shape: {x.shape}")

        # Subtract mean along time axis (axis=1 for channel data)
        mean_values = x.mean(axis=-1, keepdims=True)
        result: NDArrayReal = x - mean_values

        logger.debug(f"DC removal applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'remove_dc' class-attribute instance-attribute
Functions
__init__(sampling_rate)

Initialize DC removal operation.

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/effects.py
216
217
218
219
220
221
222
223
224
225
def __init__(self, sampling_rate: float):
    """Initialize DC removal operation.

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    super().__init__(sampling_rate)
    logger.debug("Initialized RemoveDC operation")
calculate_output_shape(input_shape)

Calculate output data shape after operation.

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape (same as input)

Source code in wandas/processing/effects.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """Calculate output data shape after operation.

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape (same as input)
    """
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/effects.py
242
243
244
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "dcRM"

AddWithSNR

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Addition operation considering SNR

Source code in wandas/processing/effects.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class AddWithSNR(AudioOperation[NDArrayReal, NDArrayReal]):
    """Addition operation considering SNR"""

    name = "add_with_snr"

    def __init__(self, sampling_rate: float, other: DaArray, snr: float = 1.0):
        """
        Initialize addition operation considering SNR

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        other : DaArray
            Noise signal to add (channel-frame format)
        snr : float
            Signal-to-noise ratio (dB)
        """
        super().__init__(sampling_rate, other=other, snr=snr)

        self.other = other
        self.snr = snr
        logger.debug(f"Initialized AddWithSNR operation with SNR: {snr} dB")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape (same as input)
        """
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "+SNR"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Perform addition processing considering SNR"""
        logger.debug(f"Applying SNR-based addition with shape: {x.shape}")
        other: NDArrayReal = self.other.compute()

        # Use multi-channel versions of calculate_rms and calculate_desired_noise_rms
        clean_rms = util.calculate_rms(x)
        other_rms = util.calculate_rms(other)

        # Adjust noise gain based on specified SNR (apply per channel)
        desired_noise_rms = util.calculate_desired_noise_rms(clean_rms, self.snr)

        # Apply gain per channel using broadcasting
        gain = desired_noise_rms / other_rms
        # Add adjusted noise to signal
        result: NDArrayReal = x + other * gain
        return result
Attributes
name = 'add_with_snr' class-attribute instance-attribute
other = other instance-attribute
snr = snr instance-attribute
Functions
__init__(sampling_rate, other, snr=1.0)

Initialize addition operation considering SNR

Parameters

sampling_rate : float Sampling rate (Hz) other : DaArray Noise signal to add (channel-frame format) snr : float Signal-to-noise ratio (dB)

Source code in wandas/processing/effects.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def __init__(self, sampling_rate: float, other: DaArray, snr: float = 1.0):
    """
    Initialize addition operation considering SNR

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    other : DaArray
        Noise signal to add (channel-frame format)
    snr : float
        Signal-to-noise ratio (dB)
    """
    super().__init__(sampling_rate, other=other, snr=snr)

    self.other = other
    self.snr = snr
    logger.debug(f"Initialized AddWithSNR operation with SNR: {snr} dB")
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape (same as input)

Source code in wandas/processing/effects.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape (same as input)
    """
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/effects.py
309
310
311
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "+SNR"

Fade

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Fade operation using a Tukey (tapered cosine) window.

This operation applies symmetric fade-in and fade-out with the same duration. The Tukey window alpha parameter is computed from the fade duration so that the tapered portion equals the requested fade length at each end.

Source code in wandas/processing/effects.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
class Fade(AudioOperation[NDArrayReal, NDArrayReal]):
    """Fade operation using a Tukey (tapered cosine) window.

    This operation applies symmetric fade-in and fade-out with the same
    duration. The Tukey window alpha parameter is computed from the fade
    duration so that the tapered portion equals the requested fade length
    at each end.
    """

    name = "fade"

    def __init__(self, sampling_rate: float, fade_ms: float = 50) -> None:
        self.fade_ms = float(fade_ms)
        # Precompute fade length in samples at construction time
        self.fade_len = int(round(self.fade_ms * float(sampling_rate) / 1000.0))
        super().__init__(sampling_rate, fade_ms=fade_ms)

    def validate_params(self) -> None:
        if self.fade_ms < 0:
            raise ValueError("fade_ms must be non-negative")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "fade"

    @staticmethod
    def calculate_tukey_alpha(fade_len: int, n_samples: int) -> float:
        """Calculate Tukey window alpha parameter from fade length.

        The alpha parameter determines what fraction of the window is tapered.
        For symmetric fade-in/fade-out, alpha = 2 * fade_len / n_samples ensures
        that each side's taper has exactly fade_len samples.

        Parameters
        ----------
        fade_len : int
            Desired fade length in samples for each end (in and out).
        n_samples : int
            Total number of samples in the signal.

        Returns
        -------
        float
            Alpha parameter for scipy.signal.windows.tukey, clamped to [0, 1].

        Examples
        --------
        >>> Fade.calculate_tukey_alpha(fade_len=20, n_samples=200)
        0.2
        >>> Fade.calculate_tukey_alpha(fade_len=100, n_samples=100)
        1.0
        """
        alpha = float(2 * fade_len) / float(n_samples)
        return min(1.0, alpha)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        logger.debug(f"Applying Tukey Fade to array with shape: {x.shape}")

        arr = x
        if arr.ndim == 1:
            arr = arr.reshape(1, -1)

        n_samples = int(arr.shape[-1])

        # If no fade requested, return input
        if self.fade_len <= 0:
            return arr

        if 2 * self.fade_len >= n_samples:
            raise ValueError("Fade length too long: 2*fade_ms must be less than signal length")

        # Calculate Tukey window alpha parameter
        alpha = self.calculate_tukey_alpha(self.fade_len, n_samples)

        # Create tukey window (numpy) and apply
        env = sp_windows.tukey(n_samples, alpha=alpha)

        result: NDArrayReal = arr * env[None, :]
        logger.debug("Tukey fade applied")
        return result
Attributes
name = 'fade' class-attribute instance-attribute
fade_ms = float(fade_ms) instance-attribute
fade_len = int(round(self.fade_ms * float(sampling_rate) / 1000.0)) instance-attribute
Functions
__init__(sampling_rate, fade_ms=50)
Source code in wandas/processing/effects.py
343
344
345
346
347
def __init__(self, sampling_rate: float, fade_ms: float = 50) -> None:
    self.fade_ms = float(fade_ms)
    # Precompute fade length in samples at construction time
    self.fade_len = int(round(self.fade_ms * float(sampling_rate) / 1000.0))
    super().__init__(sampling_rate, fade_ms=fade_ms)
validate_params()
Source code in wandas/processing/effects.py
349
350
351
def validate_params(self) -> None:
    if self.fade_ms < 0:
        raise ValueError("fade_ms must be non-negative")
calculate_output_shape(input_shape)
Source code in wandas/processing/effects.py
353
354
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/effects.py
356
357
358
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "fade"
calculate_tukey_alpha(fade_len, n_samples) staticmethod

Calculate Tukey window alpha parameter from fade length.

The alpha parameter determines what fraction of the window is tapered. For symmetric fade-in/fade-out, alpha = 2 * fade_len / n_samples ensures that each side's taper has exactly fade_len samples.

Parameters

fade_len : int Desired fade length in samples for each end (in and out). n_samples : int Total number of samples in the signal.

Returns

float Alpha parameter for scipy.signal.windows.tukey, clamped to [0, 1].

Examples

Fade.calculate_tukey_alpha(fade_len=20, n_samples=200) 0.2 Fade.calculate_tukey_alpha(fade_len=100, n_samples=100) 1.0

Source code in wandas/processing/effects.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
@staticmethod
def calculate_tukey_alpha(fade_len: int, n_samples: int) -> float:
    """Calculate Tukey window alpha parameter from fade length.

    The alpha parameter determines what fraction of the window is tapered.
    For symmetric fade-in/fade-out, alpha = 2 * fade_len / n_samples ensures
    that each side's taper has exactly fade_len samples.

    Parameters
    ----------
    fade_len : int
        Desired fade length in samples for each end (in and out).
    n_samples : int
        Total number of samples in the signal.

    Returns
    -------
    float
        Alpha parameter for scipy.signal.windows.tukey, clamped to [0, 1].

    Examples
    --------
    >>> Fade.calculate_tukey_alpha(fade_len=20, n_samples=200)
    0.2
    >>> Fade.calculate_tukey_alpha(fade_len=100, n_samples=100)
    1.0
    """
    alpha = float(2 * fade_len) / float(n_samples)
    return min(1.0, alpha)

Functions

Modules

Filters / フィルター

Provides various audio filter processing. 様々なオーディオフィルター処理を提供します。

wandas.processing.filters

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

HighPassFilter

Bases: AudioOperation[NDArrayReal, NDArrayReal]

High-pass filter operation

Source code in wandas/processing/filters.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class HighPassFilter(AudioOperation[NDArrayReal, NDArrayReal]):
    """High-pass filter operation"""

    name = "highpass_filter"
    a: NDArrayReal
    b: NDArrayReal

    def __init__(self, sampling_rate: float, cutoff: float, order: int = 4):
        """
        Initialize high-pass filter

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        cutoff : float
            Cutoff frequency (Hz). Must be between 0 and Nyquist frequency
            (sampling_rate / 2).
        order : int, optional
            Filter order, default is 4

        Raises
        ------
        ValueError
            If cutoff frequency is not within valid range (0 < cutoff < Nyquist)
        """
        self.cutoff = cutoff
        self.order = order
        super().__init__(sampling_rate, cutoff=cutoff, order=order)

    def validate_params(self) -> None:
        """Validate parameters"""
        nyquist = self.sampling_rate / 2
        if self.cutoff <= 0 or self.cutoff >= nyquist:
            raise ValueError(
                f"Cutoff frequency out of valid range\n"
                f"  Got: {self.cutoff} Hz\n"
                f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
                f"The Nyquist frequency is half the sampling rate\n"
                f"  ({self.sampling_rate} Hz).\n"
                f"Filters cannot work above this limit due to aliasing.\n"
                f"Solutions:\n"
                f"  - Use a cutoff frequency below {nyquist} Hz\n"
                f"  - Or increase sampling rate above {self.cutoff * 2} Hz\n"
                f"    using resample()"
            )

    def _setup_processor(self) -> None:
        """Set up high-pass filter processor"""
        # Calculate filter coefficients (once) - safely retrieve from instance variables
        nyquist = 0.5 * self.sampling_rate
        normal_cutoff = self.cutoff / nyquist

        # Precompute and save filter coefficients
        self.b, self.a = signal.butter(self.order, normal_cutoff, btype="high")  # type: ignore [unused-ignore]
        logger.debug(f"Highpass filter coefficients calculated: b={self.b}, a={self.a}")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "hpf"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Filter processing wrapped with @dask.delayed"""
        logger.debug(f"Applying highpass filter to array with shape: {x.shape}")
        result: NDArrayReal = signal.filtfilt(self.b, self.a, x, axis=1)
        logger.debug(f"Filter applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'highpass_filter' class-attribute instance-attribute
a instance-attribute
b instance-attribute
cutoff = cutoff instance-attribute
order = order instance-attribute
Functions
__init__(sampling_rate, cutoff, order=4)

Initialize high-pass filter

Parameters

sampling_rate : float Sampling rate (Hz) cutoff : float Cutoff frequency (Hz). Must be between 0 and Nyquist frequency (sampling_rate / 2). order : int, optional Filter order, default is 4

Raises

ValueError If cutoff frequency is not within valid range (0 < cutoff < Nyquist)

Source code in wandas/processing/filters.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, sampling_rate: float, cutoff: float, order: int = 4):
    """
    Initialize high-pass filter

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    cutoff : float
        Cutoff frequency (Hz). Must be between 0 and Nyquist frequency
        (sampling_rate / 2).
    order : int, optional
        Filter order, default is 4

    Raises
    ------
    ValueError
        If cutoff frequency is not within valid range (0 < cutoff < Nyquist)
    """
    self.cutoff = cutoff
    self.order = order
    super().__init__(sampling_rate, cutoff=cutoff, order=order)
validate_params()

Validate parameters

Source code in wandas/processing/filters.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def validate_params(self) -> None:
    """Validate parameters"""
    nyquist = self.sampling_rate / 2
    if self.cutoff <= 0 or self.cutoff >= nyquist:
        raise ValueError(
            f"Cutoff frequency out of valid range\n"
            f"  Got: {self.cutoff} Hz\n"
            f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
            f"The Nyquist frequency is half the sampling rate\n"
            f"  ({self.sampling_rate} Hz).\n"
            f"Filters cannot work above this limit due to aliasing.\n"
            f"Solutions:\n"
            f"  - Use a cutoff frequency below {nyquist} Hz\n"
            f"  - Or increase sampling rate above {self.cutoff * 2} Hz\n"
            f"    using resample()"
        )
calculate_output_shape(input_shape)
Source code in wandas/processing/filters.py
70
71
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/filters.py
73
74
75
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "hpf"

LowPassFilter

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Low-pass filter operation

Source code in wandas/processing/filters.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class LowPassFilter(AudioOperation[NDArrayReal, NDArrayReal]):
    """Low-pass filter operation"""

    name = "lowpass_filter"
    a: NDArrayReal
    b: NDArrayReal

    def __init__(self, sampling_rate: float, cutoff: float, order: int = 4):
        """
        Initialize low-pass filter

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        cutoff : float
            Cutoff frequency (Hz). Must be between 0 and Nyquist frequency
            (sampling_rate / 2).
        order : int, optional
            Filter order, default is 4

        Raises
        ------
        ValueError
            If cutoff frequency is not within valid range (0 < cutoff < Nyquist)
        """
        self.cutoff = cutoff
        self.order = order
        super().__init__(sampling_rate, cutoff=cutoff, order=order)

    def validate_params(self) -> None:
        """Validate parameters"""
        nyquist = self.sampling_rate / 2
        if self.cutoff <= 0 or self.cutoff >= nyquist:
            raise ValueError(
                f"Cutoff frequency out of valid range\n"
                f"  Got: {self.cutoff} Hz\n"
                f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
                f"The Nyquist frequency is half the sampling rate\n"
                f"  ({self.sampling_rate} Hz).\n"
                f"Filters cannot work above this limit due to aliasing.\n"
                f"Solutions:\n"
                f"  - Use a cutoff frequency below {nyquist} Hz\n"
                f"  - Or increase sampling rate above {self.cutoff * 2} Hz\n"
                f"    using resample()"
            )

    def _setup_processor(self) -> None:
        """Set up low-pass filter processor"""
        nyquist = 0.5 * self.sampling_rate
        normal_cutoff = self.cutoff / nyquist

        # Precompute and save filter coefficients
        self.b, self.a = signal.butter(self.order, normal_cutoff, btype="low")  # type: ignore [unused-ignore]
        logger.debug(f"Lowpass filter coefficients calculated: b={self.b}, a={self.a}")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "lpf"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Filter processing wrapped with @dask.delayed"""
        logger.debug(f"Applying lowpass filter to array with shape: {x.shape}")
        result: NDArrayReal = signal.filtfilt(self.b, self.a, x, axis=1)

        logger.debug(f"Filter applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'lowpass_filter' class-attribute instance-attribute
a instance-attribute
b instance-attribute
cutoff = cutoff instance-attribute
order = order instance-attribute
Functions
__init__(sampling_rate, cutoff, order=4)

Initialize low-pass filter

Parameters

sampling_rate : float Sampling rate (Hz) cutoff : float Cutoff frequency (Hz). Must be between 0 and Nyquist frequency (sampling_rate / 2). order : int, optional Filter order, default is 4

Raises

ValueError If cutoff frequency is not within valid range (0 < cutoff < Nyquist)

Source code in wandas/processing/filters.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __init__(self, sampling_rate: float, cutoff: float, order: int = 4):
    """
    Initialize low-pass filter

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    cutoff : float
        Cutoff frequency (Hz). Must be between 0 and Nyquist frequency
        (sampling_rate / 2).
    order : int, optional
        Filter order, default is 4

    Raises
    ------
    ValueError
        If cutoff frequency is not within valid range (0 < cutoff < Nyquist)
    """
    self.cutoff = cutoff
    self.order = order
    super().__init__(sampling_rate, cutoff=cutoff, order=order)
validate_params()

Validate parameters

Source code in wandas/processing/filters.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def validate_params(self) -> None:
    """Validate parameters"""
    nyquist = self.sampling_rate / 2
    if self.cutoff <= 0 or self.cutoff >= nyquist:
        raise ValueError(
            f"Cutoff frequency out of valid range\n"
            f"  Got: {self.cutoff} Hz\n"
            f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
            f"The Nyquist frequency is half the sampling rate\n"
            f"  ({self.sampling_rate} Hz).\n"
            f"Filters cannot work above this limit due to aliasing.\n"
            f"Solutions:\n"
            f"  - Use a cutoff frequency below {nyquist} Hz\n"
            f"  - Or increase sampling rate above {self.cutoff * 2} Hz\n"
            f"    using resample()"
        )
calculate_output_shape(input_shape)
Source code in wandas/processing/filters.py
141
142
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/filters.py
144
145
146
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "lpf"

BandPassFilter

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Band-pass filter operation

Source code in wandas/processing/filters.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class BandPassFilter(AudioOperation[NDArrayReal, NDArrayReal]):
    """Band-pass filter operation"""

    name = "bandpass_filter"
    a: NDArrayReal
    b: NDArrayReal

    def __init__(
        self,
        sampling_rate: float,
        low_cutoff: float,
        high_cutoff: float,
        order: int = 4,
    ):
        """
        Initialize band-pass filter

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        low_cutoff : float
            Lower cutoff frequency (Hz). Must be between 0 and Nyquist frequency.
        high_cutoff : float
            Higher cutoff frequency (Hz). Must be between 0 and Nyquist frequency
            and greater than low_cutoff.
        order : int, optional
            Filter order, default is 4

        Raises
        ------
        ValueError
            If either cutoff frequency is not within valid range (0 < cutoff < Nyquist),
            or if low_cutoff >= high_cutoff
        """
        self.low_cutoff = low_cutoff
        self.high_cutoff = high_cutoff
        self.order = order
        super().__init__(sampling_rate, low_cutoff=low_cutoff, high_cutoff=high_cutoff, order=order)

    def validate_params(self) -> None:
        """Validate parameters"""
        nyquist = self.sampling_rate / 2
        if self.low_cutoff <= 0 or self.low_cutoff >= nyquist:
            raise ValueError(
                f"Lower cutoff frequency out of valid range\n"
                f"  Got: {self.low_cutoff} Hz\n"
                f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
                f"The Nyquist frequency is half the sampling rate\n"
                f"  ({self.sampling_rate} Hz).\n"
                f"Filters cannot work above this limit due to aliasing.\n"
                f"Use a lower cutoff frequency below {nyquist} Hz"
            )
        if self.high_cutoff <= 0 or self.high_cutoff >= nyquist:
            raise ValueError(
                f"Higher cutoff frequency out of valid range\n"
                f"  Got: {self.high_cutoff} Hz\n"
                f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
                f"The Nyquist frequency is half the sampling rate\n"
                f"  ({self.sampling_rate} Hz).\n"
                f"Filters cannot work above this limit due to aliasing.\n"
                f"Use a cutoff frequency below {nyquist} Hz"
            )
        if self.low_cutoff >= self.high_cutoff:
            raise ValueError(
                f"Invalid bandpass filter cutoff frequencies\n"
                f"  Lower cutoff: {self.low_cutoff} Hz\n"
                f"  Higher cutoff: {self.high_cutoff} Hz\n"
                f"  Problem: Lower cutoff must be less than higher cutoff\n"
                f"A bandpass filter passes frequencies between low and high\n"
                f"  cutoffs.\n"
                f"Ensure low_cutoff < high_cutoff\n"
                f"  (e.g., low_cutoff=100, high_cutoff=1000)"
            )

    def _setup_processor(self) -> None:
        """Set up band-pass filter processor"""
        nyquist = 0.5 * self.sampling_rate
        low_normal_cutoff = self.low_cutoff / nyquist
        high_normal_cutoff = self.high_cutoff / nyquist

        # Precompute and save filter coefficients
        self.b, self.a = signal.butter(self.order, [low_normal_cutoff, high_normal_cutoff], btype="band")  # type: ignore [unused-ignore]
        logger.debug(f"Bandpass filter coefficients calculated: b={self.b}, a={self.a}")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "bpf"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Filter processing wrapped with @dask.delayed"""
        logger.debug(f"Applying bandpass filter to array with shape: {x.shape}")
        result: NDArrayReal = signal.filtfilt(self.b, self.a, x, axis=1)
        logger.debug(f"Filter applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'bandpass_filter' class-attribute instance-attribute
a instance-attribute
b instance-attribute
low_cutoff = low_cutoff instance-attribute
high_cutoff = high_cutoff instance-attribute
order = order instance-attribute
Functions
__init__(sampling_rate, low_cutoff, high_cutoff, order=4)

Initialize band-pass filter

Parameters

sampling_rate : float Sampling rate (Hz) low_cutoff : float Lower cutoff frequency (Hz). Must be between 0 and Nyquist frequency. high_cutoff : float Higher cutoff frequency (Hz). Must be between 0 and Nyquist frequency and greater than low_cutoff. order : int, optional Filter order, default is 4

Raises

ValueError If either cutoff frequency is not within valid range (0 < cutoff < Nyquist), or if low_cutoff >= high_cutoff

Source code in wandas/processing/filters.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def __init__(
    self,
    sampling_rate: float,
    low_cutoff: float,
    high_cutoff: float,
    order: int = 4,
):
    """
    Initialize band-pass filter

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    low_cutoff : float
        Lower cutoff frequency (Hz). Must be between 0 and Nyquist frequency.
    high_cutoff : float
        Higher cutoff frequency (Hz). Must be between 0 and Nyquist frequency
        and greater than low_cutoff.
    order : int, optional
        Filter order, default is 4

    Raises
    ------
    ValueError
        If either cutoff frequency is not within valid range (0 < cutoff < Nyquist),
        or if low_cutoff >= high_cutoff
    """
    self.low_cutoff = low_cutoff
    self.high_cutoff = high_cutoff
    self.order = order
    super().__init__(sampling_rate, low_cutoff=low_cutoff, high_cutoff=high_cutoff, order=order)
validate_params()

Validate parameters

Source code in wandas/processing/filters.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def validate_params(self) -> None:
    """Validate parameters"""
    nyquist = self.sampling_rate / 2
    if self.low_cutoff <= 0 or self.low_cutoff >= nyquist:
        raise ValueError(
            f"Lower cutoff frequency out of valid range\n"
            f"  Got: {self.low_cutoff} Hz\n"
            f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
            f"The Nyquist frequency is half the sampling rate\n"
            f"  ({self.sampling_rate} Hz).\n"
            f"Filters cannot work above this limit due to aliasing.\n"
            f"Use a lower cutoff frequency below {nyquist} Hz"
        )
    if self.high_cutoff <= 0 or self.high_cutoff >= nyquist:
        raise ValueError(
            f"Higher cutoff frequency out of valid range\n"
            f"  Got: {self.high_cutoff} Hz\n"
            f"  Valid range: 0 < cutoff < {nyquist} Hz (Nyquist frequency)\n"
            f"The Nyquist frequency is half the sampling rate\n"
            f"  ({self.sampling_rate} Hz).\n"
            f"Filters cannot work above this limit due to aliasing.\n"
            f"Use a cutoff frequency below {nyquist} Hz"
        )
    if self.low_cutoff >= self.high_cutoff:
        raise ValueError(
            f"Invalid bandpass filter cutoff frequencies\n"
            f"  Lower cutoff: {self.low_cutoff} Hz\n"
            f"  Higher cutoff: {self.high_cutoff} Hz\n"
            f"  Problem: Lower cutoff must be less than higher cutoff\n"
            f"A bandpass filter passes frequencies between low and high\n"
            f"  cutoffs.\n"
            f"Ensure low_cutoff < high_cutoff\n"
            f"  (e.g., low_cutoff=100, high_cutoff=1000)"
        )
calculate_output_shape(input_shape)
Source code in wandas/processing/filters.py
242
243
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/filters.py
245
246
247
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "bpf"

AWeighting

Bases: AudioOperation[NDArrayReal, NDArrayReal]

A-weighting filter operation

Source code in wandas/processing/filters.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class AWeighting(AudioOperation[NDArrayReal, NDArrayReal]):
    """A-weighting filter operation"""

    name = "a_weighting"

    def __init__(self, sampling_rate: float):
        """
        Initialize A-weighting filter

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        super().__init__(sampling_rate)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        return input_shape

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Aw"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for A-weighting filter"""
        logger.debug(f"Applying A-weighting to array with shape: {x.shape}")
        result = A_weight(x, self.sampling_rate)

        # Handle case where A_weight returns a tuple
        if isinstance(result, tuple):
            # Use the first element of the tuple
            result = result[0]

        logger.debug(f"A-weighting applied, returning result with shape: {result.shape}")
        return np.array(result)
Attributes
name = 'a_weighting' class-attribute instance-attribute
Functions
__init__(sampling_rate)

Initialize A-weighting filter

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/filters.py
262
263
264
265
266
267
268
269
270
271
def __init__(self, sampling_rate: float):
    """
    Initialize A-weighting filter

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    super().__init__(sampling_rate)
calculate_output_shape(input_shape)
Source code in wandas/processing/filters.py
273
274
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/filters.py
276
277
278
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Aw"

Functions

Spectral Processing / スペクトル処理

Provides spectral analysis and processing capabilities. スペクトル解析と処理機能を提供します。

wandas.processing.spectral

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

FFT

Bases: AudioOperation[NDArrayReal, NDArrayComplex]

FFT (Fast Fourier Transform) operation

Source code in wandas/processing/spectral.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class FFT(AudioOperation[NDArrayReal, NDArrayComplex]):
    """FFT (Fast Fourier Transform) operation"""

    name = "fft"
    n_fft: int | None
    window: str

    def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
        """
        Initialize FFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int, optional
            FFT size, default is None (determined by input size)
        window : str, optional
            Window function type, default is 'hann'

        Raises
        ------
        ValueError
            If n_fft is not a positive integer
        """
        # Validate n_fft parameter
        if n_fft is not None and n_fft <= 0:
            raise ValueError(
                f"Invalid FFT size\n"
                f"  Got: {n_fft}\n"
                f"  Expected: Positive integer > 0\n"
                f"FFT size must be a positive integer.\n"
                f"Common values: 512, 1024, 2048, 4096,\n"
                f"8192 (powers of 2 are most efficient)"
            )

        self.n_fft = n_fft
        self.window = window
        super().__init__(sampling_rate, n_fft=n_fft, window=window)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        操作後の出力データの形状を計算します

        Parameters
        ----------
        input_shape : tuple
            入力データの形状 (channels, samples)

        Returns
        -------
        tuple
            出力データの形状 (channels, freqs)
        """
        n_freqs = self.n_fft // 2 + 1 if self.n_fft else input_shape[-1] // 2 + 1
        return (*input_shape[:-1], n_freqs)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "FFT"

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """FFT操作のプロセッサ関数を作成"""
        from scipy.signal import get_window

        if self.n_fft is not None and x.shape[-1] > self.n_fft:
            # If n_fft is specified and input length exceeds it, truncate
            x = x[..., : self.n_fft]

        win = get_window(self.window, x.shape[-1])
        x = x * win
        result: NDArrayComplex = np.fft.rfft(x, n=self.n_fft, axis=-1)
        result[..., 1:-1] *= 2.0
        # 窓関数補正
        scaling_factor = np.sum(win)
        result = result / scaling_factor
        return result
Attributes
name = 'fft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
window = window instance-attribute
Functions
__init__(sampling_rate, n_fft=None, window='hann')

Initialize FFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int, optional FFT size, default is None (determined by input size) window : str, optional Window function type, default is 'hann'

Raises

ValueError If n_fft is not a positive integer

Source code in wandas/processing/spectral.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
    """
    Initialize FFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int, optional
        FFT size, default is None (determined by input size)
    window : str, optional
        Window function type, default is 'hann'

    Raises
    ------
    ValueError
        If n_fft is not a positive integer
    """
    # Validate n_fft parameter
    if n_fft is not None and n_fft <= 0:
        raise ValueError(
            f"Invalid FFT size\n"
            f"  Got: {n_fft}\n"
            f"  Expected: Positive integer > 0\n"
            f"FFT size must be a positive integer.\n"
            f"Common values: 512, 1024, 2048, 4096,\n"
            f"8192 (powers of 2 are most efficient)"
        )

    self.n_fft = n_fft
    self.window = window
    super().__init__(sampling_rate, n_fft=n_fft, window=window)
calculate_output_shape(input_shape)

操作後の出力データの形状を計算します

Parameters

input_shape : tuple 入力データの形状 (channels, samples)

Returns

tuple 出力データの形状 (channels, freqs)

Source code in wandas/processing/spectral.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    操作後の出力データの形状を計算します

    Parameters
    ----------
    input_shape : tuple
        入力データの形状 (channels, samples)

    Returns
    -------
    tuple
        出力データの形状 (channels, freqs)
    """
    n_freqs = self.n_fft // 2 + 1 if self.n_fft else input_shape[-1] // 2 + 1
    return (*input_shape[:-1], n_freqs)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
173
174
175
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "FFT"

IFFT

Bases: AudioOperation[NDArrayComplex, NDArrayReal]

IFFT (Inverse Fast Fourier Transform) operation

Source code in wandas/processing/spectral.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class IFFT(AudioOperation[NDArrayComplex, NDArrayReal]):
    """IFFT (Inverse Fast Fourier Transform) operation"""

    name = "ifft"
    n_fft: int | None
    window: str

    def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
        """
        Initialize IFFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : Optional[int], optional
            IFFT size, default is None (determined based on input size)
        window : str, optional
            Window function type, default is 'hann'
        """
        self.n_fft = n_fft
        self.window = window
        super().__init__(sampling_rate, n_fft=n_fft, window=window)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, freqs)

        Returns
        -------
        tuple
            Output data shape (channels, samples)
        """
        n_samples = 2 * (input_shape[-1] - 1) if self.n_fft is None else self.n_fft
        return (*input_shape[:-1], n_samples)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "iFFT"

    def _process_array(self, x: NDArrayComplex) -> NDArrayReal:
        """Create processor function for IFFT operation"""
        logger.debug(f"Applying IFFT to array with shape: {x.shape}")

        # Restore frequency component scaling (remove the 2.0 multiplier applied in FFT)
        _x = x.copy()
        _x[..., 1:-1] /= 2.0

        # Execute IFFT
        result: NDArrayReal = np.fft.irfft(_x, n=self.n_fft, axis=-1)

        # Window function correction (inverse of FFT operation)
        from scipy.signal import get_window

        win = get_window(self.window, result.shape[-1])

        # Correct the FFT window function scaling
        scaling_factor = np.sum(win) / result.shape[-1]
        result = result / scaling_factor

        logger.debug(f"IFFT applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'ifft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
window = window instance-attribute
Functions
__init__(sampling_rate, n_fft=None, window='hann')

Initialize IFFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : Optional[int], optional IFFT size, default is None (determined based on input size) window : str, optional Window function type, default is 'hann'

Source code in wandas/processing/spectral.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
    """
    Initialize IFFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : Optional[int], optional
        IFFT size, default is None (determined based on input size)
    window : str, optional
        Window function type, default is 'hann'
    """
    self.n_fft = n_fft
    self.window = window
    super().__init__(sampling_rate, n_fft=n_fft, window=window)
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, freqs)

Returns

tuple Output data shape (channels, samples)

Source code in wandas/processing/spectral.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, freqs)

    Returns
    -------
    tuple
        Output data shape (channels, samples)
    """
    n_samples = 2 * (input_shape[-1] - 1) if self.n_fft is None else self.n_fft
    return (*input_shape[:-1], n_samples)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
236
237
238
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "iFFT"

STFT

Bases: AudioOperation[NDArrayReal, NDArrayComplex]

Short-Time Fourier Transform operation

Source code in wandas/processing/spectral.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class STFT(AudioOperation[NDArrayReal, NDArrayComplex]):
    """Short-Time Fourier Transform operation"""

    name = "stft"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
    ):
        """
        Initialize STFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window type, default is 'hann'

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "STFT")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.noverlap = self.win_length - self.hop_length if hop_length is not None else None
        self.window = window

        self.SFT = ShortTimeFFT(
            win=get_window(window, self.win_length),
            hop=self.hop_length,
            fs=sampling_rate,
            mfft=self.n_fft,
            scale_to="magnitude",
        )
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            win_length=self.win_length,
            hop_length=self.hop_length,
            window=window,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        n_samples = input_shape[-1]
        n_f = len(self.SFT.f)
        n_t = len(self.SFT.t(n_samples))
        return (input_shape[0], n_f, n_t)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "STFT"

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Apply SciPy STFT processing to multiple channels at once"""
        logger.debug(f"Applying SciPy STFT to array with shape: {x.shape}")

        # Convert 1D input to 2D
        if x.ndim == 1:
            x = x.reshape(1, -1)

        # Apply STFT to all channels at once
        result: NDArrayComplex = self.SFT.stft(x)
        result[..., 1:-1, :] *= 2.0
        logger.debug(f"SciPy STFT applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'stft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
noverlap = self.win_length - self.hop_length if hop_length is not None else None instance-attribute
window = window instance-attribute
SFT = ShortTimeFFT(win=(get_window(window, self.win_length)), hop=(self.hop_length), fs=sampling_rate, mfft=(self.n_fft), scale_to='magnitude') instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann')

Initialize STFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window type, default is 'hann'

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
):
    """
    Initialize STFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window type, default is 'hann'

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "STFT")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.noverlap = self.win_length - self.hop_length if hop_length is not None else None
    self.window = window

    self.SFT = ShortTimeFFT(
        win=get_window(window, self.win_length),
        hop=self.hop_length,
        fs=sampling_rate,
        mfft=self.n_fft,
        scale_to="magnitude",
    )
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        win_length=self.win_length,
        hop_length=self.hop_length,
        window=window,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/spectral.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    n_samples = input_shape[-1]
    n_f = len(self.SFT.f)
    n_t = len(self.SFT.t(n_samples))
    return (input_shape[0], n_f, n_t)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
341
342
343
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "STFT"

ISTFT

Bases: AudioOperation[NDArrayComplex, NDArrayReal]

Inverse Short-Time Fourier Transform operation

Source code in wandas/processing/spectral.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
class ISTFT(AudioOperation[NDArrayComplex, NDArrayReal]):
    """Inverse Short-Time Fourier Transform operation"""

    name = "istft"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        length: int | None = None,
    ):
        """
        Initialize ISTFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window type, default is 'hann'
        length : int, optional
            Length of output signal. Default is None (determined from input)

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "ISTFT")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.window = window
        self.length = length

        # Instantiate ShortTimeFFT for ISTFT calculation
        self.SFT = ShortTimeFFT(
            win=get_window(window, self.win_length),
            hop=self.hop_length,
            fs=sampling_rate,
            mfft=self.n_fft,
            scale_to="magnitude",  # Consistent scaling with STFT
        )

        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            win_length=self.win_length,
            hop_length=self.hop_length,
            window=window,
            length=length,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after ISTFT operation.

        Uses the SciPy ShortTimeFFT calculation formula to compute the expected
        output length based on the input spectrogram dimensions and output range
        parameters (k0, k1).

        Parameters
        ----------
        input_shape : tuple
            Input spectrogram shape (channels, n_freqs, n_frames)
            where n_freqs = n_fft // 2 + 1 and n_frames is the number of time frames.

        Returns
        -------
        tuple
            Output shape (channels, output_samples) where output_samples is the
            reconstructed signal length determined by the output range [k0, k1).

        Notes
        -----
        The calculation follows SciPy's ShortTimeFFT.istft() implementation.
        When k1 is None (default), the maximum reconstructible signal length is
        computed as:

        .. math::

            q_{max} = n_{frames} + p_{min}

            k_{max} = (q_{max} - 1) \\cdot hop + m_{num} - m_{num\\_mid}

        The output length is then:

        .. math::

            output\\_samples = k_1 - k_0

        where k0 defaults to 0 and k1 defaults to k_max.

        Parameters that affect the calculation:
        - n_frames: number of time frames in the STFT
        - p_min: minimum frame index (ShortTimeFFT property)
        - hop: hop length (samples between frames)
        - m_num: window length
        - m_num_mid: window midpoint position
        - self.length: optional length override (if set, limits output)

        References
        ----------
        - SciPy ShortTimeFFT.istft:
          https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.istft.html
        - SciPy Source: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
        """
        n_channels = input_shape[0]
        n_frames = input_shape[-1]  # time_frames

        # SciPy ShortTimeFFT の計算式に従う
        # See: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
        q_max = n_frames + self.SFT.p_min
        k_max = (q_max - 1) * self.SFT.hop + self.SFT.m_num - self.SFT.m_num_mid

        # Default parameters: k0=0, k1=None (which becomes k_max)
        # The output length is k1 - k0 = k_max - 0 = k_max
        k0 = 0
        k1 = k_max

        # If self.length is specified, it acts as an override to limit the output
        if self.length is not None:
            k1 = min(self.length, k1)

        output_samples = k1 - k0

        return (n_channels, output_samples)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "iSTFT"

    def _process_array(self, x: NDArrayComplex) -> NDArrayReal:
        """
        Apply SciPy ISTFT processing to multiple channels at once using ShortTimeFFT"""
        logger.debug(f"Applying SciPy ISTFT (ShortTimeFFT) to array with shape: {x.shape}")

        # Convert 2D input to 3D (assume single channel)
        if x.ndim == 2:
            x = x.reshape(1, *x.shape)

        # Adjust scaling back if STFT applied factor of 2
        _x = np.copy(x)
        _x[..., 1:-1, :] /= 2.0

        # Apply ISTFT using the ShortTimeFFT instance
        result: NDArrayReal = self.SFT.istft(_x)

        # Trim to desired length if specified
        if self.length is not None:
            result = result[..., : self.length]

        logger.debug(f"ShortTimeFFT applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'istft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
window = window instance-attribute
length = length instance-attribute
SFT = ShortTimeFFT(win=(get_window(window, self.win_length)), hop=(self.hop_length), fs=sampling_rate, mfft=(self.n_fft), scale_to='magnitude') instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', length=None)

Initialize ISTFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window type, default is 'hann' length : int, optional Length of output signal. Default is None (determined from input)

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    length: int | None = None,
):
    """
    Initialize ISTFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window type, default is 'hann'
    length : int, optional
        Length of output signal. Default is None (determined from input)

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "ISTFT")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.window = window
    self.length = length

    # Instantiate ShortTimeFFT for ISTFT calculation
    self.SFT = ShortTimeFFT(
        win=get_window(window, self.win_length),
        hop=self.hop_length,
        fs=sampling_rate,
        mfft=self.n_fft,
        scale_to="magnitude",  # Consistent scaling with STFT
    )

    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        win_length=self.win_length,
        hop_length=self.hop_length,
        window=window,
        length=length,
    )
calculate_output_shape(input_shape)

Calculate output data shape after ISTFT operation.

Uses the SciPy ShortTimeFFT calculation formula to compute the expected output length based on the input spectrogram dimensions and output range parameters (k0, k1).

Parameters

input_shape : tuple Input spectrogram shape (channels, n_freqs, n_frames) where n_freqs = n_fft // 2 + 1 and n_frames is the number of time frames.

Returns

tuple Output shape (channels, output_samples) where output_samples is the reconstructed signal length determined by the output range [k0, k1).

Notes

The calculation follows SciPy's ShortTimeFFT.istft() implementation. When k1 is None (default), the maximum reconstructible signal length is computed as:

.. math::

q_{max} = n_{frames} + p_{min}

k_{max} = (q_{max} - 1) \cdot hop + m_{num} - m_{num\_mid}

The output length is then:

.. math::

output\_samples = k_1 - k_0

where k0 defaults to 0 and k1 defaults to k_max.

Parameters that affect the calculation: - n_frames: number of time frames in the STFT - p_min: minimum frame index (ShortTimeFFT property) - hop: hop length (samples between frames) - m_num: window length - m_num_mid: window midpoint position - self.length: optional length override (if set, limits output)

References
  • SciPy ShortTimeFFT.istft: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.istft.html
  • SciPy Source: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
Source code in wandas/processing/spectral.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after ISTFT operation.

    Uses the SciPy ShortTimeFFT calculation formula to compute the expected
    output length based on the input spectrogram dimensions and output range
    parameters (k0, k1).

    Parameters
    ----------
    input_shape : tuple
        Input spectrogram shape (channels, n_freqs, n_frames)
        where n_freqs = n_fft // 2 + 1 and n_frames is the number of time frames.

    Returns
    -------
    tuple
        Output shape (channels, output_samples) where output_samples is the
        reconstructed signal length determined by the output range [k0, k1).

    Notes
    -----
    The calculation follows SciPy's ShortTimeFFT.istft() implementation.
    When k1 is None (default), the maximum reconstructible signal length is
    computed as:

    .. math::

        q_{max} = n_{frames} + p_{min}

        k_{max} = (q_{max} - 1) \\cdot hop + m_{num} - m_{num\\_mid}

    The output length is then:

    .. math::

        output\\_samples = k_1 - k_0

    where k0 defaults to 0 and k1 defaults to k_max.

    Parameters that affect the calculation:
    - n_frames: number of time frames in the STFT
    - p_min: minimum frame index (ShortTimeFFT property)
    - hop: hop length (samples between frames)
    - m_num: window length
    - m_num_mid: window midpoint position
    - self.length: optional length override (if set, limits output)

    References
    ----------
    - SciPy ShortTimeFFT.istft:
      https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.istft.html
    - SciPy Source: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
    """
    n_channels = input_shape[0]
    n_frames = input_shape[-1]  # time_frames

    # SciPy ShortTimeFFT の計算式に従う
    # See: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
    q_max = n_frames + self.SFT.p_min
    k_max = (q_max - 1) * self.SFT.hop + self.SFT.m_num - self.SFT.m_num_mid

    # Default parameters: k0=0, k1=None (which becomes k_max)
    # The output length is k1 - k0 = k_max - 0 = k_max
    k0 = 0
    k1 = k_max

    # If self.length is specified, it acts as an override to limit the output
    if self.length is not None:
        k1 = min(self.length, k1)

    output_samples = k1 - k0

    return (n_channels, output_samples)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
499
500
501
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "iSTFT"

Welch

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Welch method for power spectral density estimation.

Computes the one-sided amplitude spectrum using Welch's method for consistency with FFT and STFT methods. For a sine wave with amplitude A, the peak value at its frequency will be approximately A.

Notes

Internally uses scipy.signal.welch with scaling='spectrum' and converts the power spectrum to amplitude spectrum: - DC component (f=0): A = sqrt(P) - AC components (f>0): A = sqrt(2*P)

Source code in wandas/processing/spectral.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
class Welch(AudioOperation[NDArrayReal, NDArrayReal]):
    """Welch method for power spectral density estimation.

    Computes the one-sided amplitude spectrum using Welch's method for
    consistency with FFT and STFT methods. For a sine wave with amplitude A,
    the peak value at its frequency will be approximately A.

    Notes
    -----
    Internally uses scipy.signal.welch with scaling='spectrum' and converts
    the power spectrum to amplitude spectrum:
    - DC component (f=0): A = sqrt(P)
    - AC components (f>0): A = sqrt(2*P)
    """

    name = "welch"
    n_fft: int
    window: str
    hop_length: int | None
    win_length: int | None
    average: str
    detrend: str

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        average: str = "mean",
        detrend: str = "constant",
    ):
        """
        Initialize Welch operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int, optional
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str, optional
            Window function type, default is 'hann'
        average : str, optional
            Averaging method, default is 'mean'
        detrend : str, optional
            Detrend method, default is 'constant'

        Raises
        ------
        ValueError
            If n_fft, win_length, or hop_length are invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "Welch method")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.noverlap = self.win_length - self.hop_length if hop_length is not None else None
        self.window = window
        self.average = average
        self.detrend = detrend
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            win_length=self.win_length,
            hop_length=self.hop_length,
            window=window,
            average=average,
            detrend=detrend,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels, freqs)
        """
        n_freqs = self.n_fft // 2 + 1
        return (*input_shape[:-1], n_freqs)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Welch"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for Welch operation.

        Converts power spectrum from scipy.signal.welch to one-sided
        amplitude spectrum for consistency with FFT/STFT.
        """
        from scipy import signal as ss

        if not isinstance(x, np.ndarray):
            raise ValueError("Welch operation requires a numpy ndarray, but received a non-ndarray.")

        _, result = ss.welch(
            x,
            nperseg=self.win_length,
            noverlap=self.noverlap,
            nfft=self.n_fft,
            window=self.window,
            average=self.average,
            detrend=self.detrend,
            scaling="spectrum",
        )

        # Convert power spectrum to amplitude spectrum for consistency with FFT/STFT.
        # scipy.signal.welch with scaling='spectrum' returns a one-sided power spectrum
        # where for a sine wave with amplitude A:
        #   - DC component (f=0): P = A^2 (no factor of 2 since DC is not mirrored)
        #   - AC components (f>0): P = A^2/2 (half power due to one-sided spectrum)
        # To recover amplitude A:
        #   - DC: A = sqrt(P)
        #   - AC: A = sqrt(2*P) = sqrt(2) * sqrt(P)
        result = np.sqrt(result)  # Convert to amplitude
        result[..., 1:-1] *= np.sqrt(2)  # Apply factor of sqrt(2) for AC components

        return np.array(result)
Attributes
name = 'welch' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
noverlap = self.win_length - self.hop_length if hop_length is not None else None instance-attribute
window = window instance-attribute
average = average instance-attribute
detrend = detrend instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', average='mean', detrend='constant')

Initialize Welch operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int, optional FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str, optional Window function type, default is 'hann' average : str, optional Averaging method, default is 'mean' detrend : str, optional Detrend method, default is 'constant'

Raises

ValueError If n_fft, win_length, or hop_length are invalid

Source code in wandas/processing/spectral.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    average: str = "mean",
    detrend: str = "constant",
):
    """
    Initialize Welch operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int, optional
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str, optional
        Window function type, default is 'hann'
    average : str, optional
        Averaging method, default is 'mean'
    detrend : str, optional
        Detrend method, default is 'constant'

    Raises
    ------
    ValueError
        If n_fft, win_length, or hop_length are invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "Welch method")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.noverlap = self.win_length - self.hop_length if hop_length is not None else None
    self.window = window
    self.average = average
    self.detrend = detrend
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        win_length=self.win_length,
        hop_length=self.hop_length,
        window=window,
        average=average,
        detrend=detrend,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels, freqs)

Source code in wandas/processing/spectral.py
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels, freqs)
    """
    n_freqs = self.n_fft // 2 + 1
    return (*input_shape[:-1], n_freqs)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
622
623
624
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Welch"

NOctSpectrum

Bases: AudioOperation[NDArrayReal, NDArrayReal]

N-octave spectrum operation

Source code in wandas/processing/spectral.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
class NOctSpectrum(AudioOperation[NDArrayReal, NDArrayReal]):
    """N-octave spectrum operation"""

    name = "noct_spectrum"

    def __init__(
        self,
        sampling_rate: float,
        fmin: float,
        fmax: float,
        n: int = 3,
        G: int = 10,  # noqa: N803
        fr: int = 1000,
    ):
        """
        Initialize N-octave spectrum

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        fmin : float
            Minimum frequency (Hz)
        fmax : float
            Maximum frequency (Hz)
        n : int, optional
            Number of octave divisions, default is 3
        G : int, optional
            Reference level, default is 10
        fr : int, optional
            Reference frequency, default is 1000
        """
        super().__init__(sampling_rate, fmin=fmin, fmax=fmax, n=n, G=G, fr=fr)
        self.fmin = fmin
        self.fmax = fmax
        self.n = n
        self.G = G
        self.fr = fr

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        # Calculate output shape for octave spectrum
        _, fpref = _center_freq(fmin=self.fmin, fmax=self.fmax, n=self.n, G=self.G, fr=self.fr)
        return (input_shape[0], fpref.shape[0])

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Oct"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for octave spectrum"""
        logger.debug(f"Applying NoctSpectrum to array with shape: {x.shape}")
        spec, _ = noct_spectrum(
            sig=x.T,
            fs=self.sampling_rate,
            fmin=self.fmin,
            fmax=self.fmax,
            n=self.n,
            G=self.G,
            fr=self.fr,
        )
        if spec.ndim == 1:
            # Add channel dimension for 1D
            spec = np.expand_dims(spec, axis=0)
        else:
            spec = spec.T
        logger.debug(f"NoctSpectrum applied, returning result with shape: {spec.shape}")
        return np.array(spec)
Attributes
name = 'noct_spectrum' class-attribute instance-attribute
fmin = fmin instance-attribute
fmax = fmax instance-attribute
n = n instance-attribute
G = G instance-attribute
fr = fr instance-attribute
Functions
__init__(sampling_rate, fmin, fmax, n=3, G=10, fr=1000)

Initialize N-octave spectrum

Parameters

sampling_rate : float Sampling rate (Hz) fmin : float Minimum frequency (Hz) fmax : float Maximum frequency (Hz) n : int, optional Number of octave divisions, default is 3 G : int, optional Reference level, default is 10 fr : int, optional Reference frequency, default is 1000

Source code in wandas/processing/spectral.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def __init__(
    self,
    sampling_rate: float,
    fmin: float,
    fmax: float,
    n: int = 3,
    G: int = 10,  # noqa: N803
    fr: int = 1000,
):
    """
    Initialize N-octave spectrum

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    fmin : float
        Minimum frequency (Hz)
    fmax : float
        Maximum frequency (Hz)
    n : int, optional
        Number of octave divisions, default is 3
    G : int, optional
        Reference level, default is 10
    fr : int, optional
        Reference frequency, default is 1000
    """
    super().__init__(sampling_rate, fmin=fmin, fmax=fmax, n=n, G=G, fr=fr)
    self.fmin = fmin
    self.fmax = fmax
    self.n = n
    self.G = G
    self.fr = fr
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/spectral.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    # Calculate output shape for octave spectrum
    _, fpref = _center_freq(fmin=self.fmin, fmax=self.fmax, n=self.n, G=self.G, fr=self.fr)
    return (input_shape[0], fpref.shape[0])
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
719
720
721
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Oct"

NOctSynthesis

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Octave synthesis operation

Source code in wandas/processing/spectral.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
class NOctSynthesis(AudioOperation[NDArrayReal, NDArrayReal]):
    """Octave synthesis operation"""

    name = "noct_synthesis"

    def __init__(
        self,
        sampling_rate: float,
        fmin: float,
        fmax: float,
        n: int = 3,
        G: int = 10,  # noqa: N803
        fr: int = 1000,
    ):
        """
        Initialize octave synthesis

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        fmin : float
            Minimum frequency (Hz)
        fmax : float
            Maximum frequency (Hz)
        n : int, optional
            Number of octave divisions, default is 3
        G : int, optional
            Reference level, default is 10
        fr : int, optional
            Reference frequency, default is 1000
        """
        super().__init__(sampling_rate, fmin=fmin, fmax=fmax, n=n, G=G, fr=fr)

        self.fmin = fmin
        self.fmax = fmax
        self.n = n
        self.G = G
        self.fr = fr

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        # Calculate output shape for octave spectrum
        _, fpref = _center_freq(fmin=self.fmin, fmax=self.fmax, n=self.n, G=self.G, fr=self.fr)
        return (input_shape[0], fpref.shape[0])

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Octs"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for octave synthesis"""
        logger.debug(f"Applying NoctSynthesis to array with shape: {x.shape}")
        # Calculate n from shape[-1]
        n = x.shape[-1]  # Calculate n from shape[-1]
        if n % 2 == 0:
            n = n * 2 - 1
        else:
            n = (n - 1) * 2
        freqs = np.fft.rfftfreq(n, d=1 / self.sampling_rate)
        result, _ = noct_synthesis(
            spectrum=np.abs(x).T,
            freqs=freqs,
            fmin=self.fmin,
            fmax=self.fmax,
            n=self.n,
            G=self.G,
            fr=self.fr,
        )
        result = result.T
        logger.debug(f"NoctSynthesis applied, returning result with shape: {result.shape}")
        return np.array(result)
Attributes
name = 'noct_synthesis' class-attribute instance-attribute
fmin = fmin instance-attribute
fmax = fmax instance-attribute
n = n instance-attribute
G = G instance-attribute
fr = fr instance-attribute
Functions
__init__(sampling_rate, fmin, fmax, n=3, G=10, fr=1000)

Initialize octave synthesis

Parameters

sampling_rate : float Sampling rate (Hz) fmin : float Minimum frequency (Hz) fmax : float Maximum frequency (Hz) n : int, optional Number of octave divisions, default is 3 G : int, optional Reference level, default is 10 fr : int, optional Reference frequency, default is 1000

Source code in wandas/processing/spectral.py
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
def __init__(
    self,
    sampling_rate: float,
    fmin: float,
    fmax: float,
    n: int = 3,
    G: int = 10,  # noqa: N803
    fr: int = 1000,
):
    """
    Initialize octave synthesis

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    fmin : float
        Minimum frequency (Hz)
    fmax : float
        Maximum frequency (Hz)
    n : int, optional
        Number of octave divisions, default is 3
    G : int, optional
        Reference level, default is 10
    fr : int, optional
        Reference frequency, default is 1000
    """
    super().__init__(sampling_rate, fmin=fmin, fmax=fmax, n=n, G=G, fr=fr)

    self.fmin = fmin
    self.fmax = fmax
    self.n = n
    self.G = G
    self.fr = fr
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/spectral.py
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    # Calculate output shape for octave spectrum
    _, fpref = _center_freq(fmin=self.fmin, fmax=self.fmax, n=self.n, G=self.G, fr=self.fr)
    return (input_shape[0], fpref.shape[0])
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
802
803
804
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Octs"

Coherence

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Coherence estimation operation

Source code in wandas/processing/spectral.py
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
class Coherence(AudioOperation[NDArrayReal, NDArrayReal]):
    """Coherence estimation operation"""

    name = "coherence"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        detrend: str = "constant",
    ):
        """
        Initialize coherence estimation operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window function, default is 'hann'
        detrend : str
            Type of detrend, default is 'constant'

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "Coherence")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.window = window
        self.detrend = detrend
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            hop_length=self.hop_length,
            win_length=self.win_length,
            window=window,
            detrend=detrend,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels * channels, freqs)
        """
        n_channels = input_shape[0]
        n_freqs = self.n_fft // 2 + 1
        return (n_channels * n_channels, n_freqs)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "Coh"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Processor function for coherence estimation operation"""
        logger.debug(f"Applying coherence estimation to array with shape: {x.shape}")
        from scipy import signal as ss

        _, coh = ss.coherence(
            x=x[:, np.newaxis],
            y=x[np.newaxis, :],
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.win_length - self.hop_length,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
        )

        # Reshape result to (n_channels * n_channels, n_freqs)
        result: NDArrayReal = coh.transpose(1, 0, 2).reshape(-1, coh.shape[-1])

        logger.debug(f"Coherence estimation applied, result shape: {result.shape}")
        return result
Attributes
name = 'coherence' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
window = window instance-attribute
detrend = detrend instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', detrend='constant')

Initialize coherence estimation operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window function, default is 'hann' detrend : str Type of detrend, default is 'constant'

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    detrend: str = "constant",
):
    """
    Initialize coherence estimation operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window function, default is 'hann'
    detrend : str
        Type of detrend, default is 'constant'

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "Coherence")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.window = window
    self.detrend = detrend
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        hop_length=self.hop_length,
        win_length=self.win_length,
        window=window,
        detrend=detrend,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels * channels, freqs)

Source code in wandas/processing/spectral.py
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels * channels, freqs)
    """
    n_channels = input_shape[0]
    n_freqs = self.n_fft // 2 + 1
    return (n_channels * n_channels, n_freqs)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
902
903
904
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "Coh"

CSD

Bases: AudioOperation[NDArrayReal, NDArrayComplex]

Cross-spectral density estimation operation

Source code in wandas/processing/spectral.py
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
class CSD(AudioOperation[NDArrayReal, NDArrayComplex]):
    """Cross-spectral density estimation operation"""

    name = "csd"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        detrend: str = "constant",
        scaling: str = "spectrum",
        average: str = "mean",
    ):
        """
        Initialize cross-spectral density estimation operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window function, default is 'hann'
        detrend : str
            Type of detrend, default is 'constant'
        scaling : str
            Type of scaling, default is 'spectrum'
        average : str
            Method of averaging, default is 'mean'

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "CSD")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.window = window
        self.detrend = detrend
        self.scaling = scaling
        self.average = average
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            hop_length=self.hop_length,
            win_length=self.win_length,
            window=window,
            detrend=detrend,
            scaling=scaling,
            average=average,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels * channels, freqs)
        """
        n_channels = input_shape[0]
        n_freqs = self.n_fft // 2 + 1
        return (n_channels * n_channels, n_freqs)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "CSD"

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Processor function for cross-spectral density estimation operation"""
        logger.debug(f"Applying CSD estimation to array with shape: {x.shape}")
        from scipy import signal as ss

        # Calculate all combinations using scipy's csd function
        _, csd_result = ss.csd(
            x=x[:, np.newaxis],
            y=x[np.newaxis, :],
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.win_length - self.hop_length,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
            scaling=self.scaling,
            average=self.average,
        )

        # Reshape result to (n_channels * n_channels, n_freqs)
        result: NDArrayComplex = csd_result.transpose(1, 0, 2).reshape(-1, csd_result.shape[-1])

        logger.debug(f"CSD estimation applied, result shape: {result.shape}")
        return result
Attributes
name = 'csd' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
window = window instance-attribute
detrend = detrend instance-attribute
scaling = scaling instance-attribute
average = average instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', detrend='constant', scaling='spectrum', average='mean')

Initialize cross-spectral density estimation operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window function, default is 'hann' detrend : str Type of detrend, default is 'constant' scaling : str Type of scaling, default is 'spectrum' average : str Method of averaging, default is 'mean'

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    detrend: str = "constant",
    scaling: str = "spectrum",
    average: str = "mean",
):
    """
    Initialize cross-spectral density estimation operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window function, default is 'hann'
    detrend : str
        Type of detrend, default is 'constant'
    scaling : str
        Type of scaling, default is 'spectrum'
    average : str
        Method of averaging, default is 'mean'

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "CSD")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.window = window
    self.detrend = detrend
    self.scaling = scaling
    self.average = average
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        hop_length=self.hop_length,
        win_length=self.win_length,
        window=window,
        detrend=detrend,
        scaling=scaling,
        average=average,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels * channels, freqs)

Source code in wandas/processing/spectral.py
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels * channels, freqs)
    """
    n_channels = input_shape[0]
    n_freqs = self.n_fft // 2 + 1
    return (n_channels * n_channels, n_freqs)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
1011
1012
1013
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "CSD"

TransferFunction

Bases: AudioOperation[NDArrayReal, NDArrayComplex]

Transfer function estimation operation

Source code in wandas/processing/spectral.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
class TransferFunction(AudioOperation[NDArrayReal, NDArrayComplex]):
    """Transfer function estimation operation"""

    name = "transfer_function"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        detrend: str = "constant",
        scaling: str = "spectrum",
        average: str = "mean",
    ):
        """
        Initialize transfer function estimation operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window function, default is 'hann'
        detrend : str
            Type of detrend, default is 'constant'
        scaling : str
            Type of scaling, default is 'spectrum'
        average : str
            Method of averaging, default is 'mean'

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(
            n_fft, win_length, hop_length, "Transfer function"
        )

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.window = window
        self.detrend = detrend
        self.scaling = scaling
        self.average = average
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            hop_length=self.hop_length,
            win_length=self.win_length,
            window=window,
            detrend=detrend,
            scaling=scaling,
            average=average,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels * channels, freqs)
        """
        n_channels = input_shape[0]
        n_freqs = self.n_fft // 2 + 1
        return (n_channels * n_channels, n_freqs)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "H"

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Processor function for transfer function estimation operation"""
        logger.debug(f"Applying transfer function estimation to array with shape: {x.shape}")
        from scipy import signal as ss

        # Calculate cross-spectral density between all channels
        f, p_yx = ss.csd(
            x=x[:, np.newaxis, :],
            y=x[np.newaxis, :, :],
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.win_length - self.hop_length,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
            scaling=self.scaling,
            average=self.average,
            axis=-1,
        )
        # p_yx shape: (num_channels, num_channels, num_frequencies)

        # Calculate power spectral density for each channel
        f, p_xx = ss.welch(
            x=x,
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.win_length - self.hop_length,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
            scaling=self.scaling,
            average=self.average,
            axis=-1,
        )
        # p_xx shape: (num_channels, num_frequencies)

        # Calculate transfer function H(f) = P_yx / P_xx
        h_f = p_yx / p_xx[np.newaxis, :, :]
        result: NDArrayComplex = h_f.transpose(1, 0, 2).reshape(-1, h_f.shape[-1])

        logger.debug(f"Transfer function estimation applied, result shape: {result.shape}")
        return result
Attributes
name = 'transfer_function' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
window = window instance-attribute
detrend = detrend instance-attribute
scaling = scaling instance-attribute
average = average instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', detrend='constant', scaling='spectrum', average='mean')

Initialize transfer function estimation operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window function, default is 'hann' detrend : str Type of detrend, default is 'constant' scaling : str Type of scaling, default is 'spectrum' average : str Method of averaging, default is 'mean'

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    detrend: str = "constant",
    scaling: str = "spectrum",
    average: str = "mean",
):
    """
    Initialize transfer function estimation operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window function, default is 'hann'
    detrend : str
        Type of detrend, default is 'constant'
    scaling : str
        Type of scaling, default is 'spectrum'
    average : str
        Method of averaging, default is 'mean'

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(
        n_fft, win_length, hop_length, "Transfer function"
    )

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.window = window
    self.detrend = detrend
    self.scaling = scaling
    self.average = average
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        hop_length=self.hop_length,
        win_length=self.win_length,
        window=window,
        detrend=detrend,
        scaling=scaling,
        average=average,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels * channels, freqs)

Source code in wandas/processing/spectral.py
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels * channels, freqs)
    """
    n_channels = input_shape[0]
    n_freqs = self.n_fft // 2 + 1
    return (n_channels * n_channels, n_freqs)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/spectral.py
1125
1126
1127
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "H"

Functions

Statistical Processing / 統計処理

Provides statistical analysis functions for audio data. オーディオデータの統計分析機能を提供します。

wandas.processing.stats

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

ABS

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Absolute value operation

Source code in wandas/processing/stats.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ABS(AudioOperation[NDArrayReal, NDArrayReal]):
    """Absolute value operation"""

    name = "abs"

    def __init__(self, sampling_rate: float):
        """
        Initialize absolute value operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        super().__init__(sampling_rate)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "abs"

    def process(self, data: DaArray) -> DaArray:
        # map_blocksを使わず、直接Daskの集約関数を使用
        return da.abs(data)  # type: ignore [unused-ignore]
Attributes
name = 'abs' class-attribute instance-attribute
Functions
__init__(sampling_rate)

Initialize absolute value operation

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/stats.py
17
18
19
20
21
22
23
24
25
26
def __init__(self, sampling_rate: float):
    """
    Initialize absolute value operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    super().__init__(sampling_rate)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/stats.py
28
29
30
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "abs"
process(data)
Source code in wandas/processing/stats.py
32
33
34
def process(self, data: DaArray) -> DaArray:
    # map_blocksを使わず、直接Daskの集約関数を使用
    return da.abs(data)  # type: ignore [unused-ignore]

Power

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Power operation

Source code in wandas/processing/stats.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Power(AudioOperation[NDArrayReal, NDArrayReal]):
    """Power operation"""

    name = "power"

    def __init__(self, sampling_rate: float, exponent: float):
        """
        Initialize power operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        exponent : float
            Power exponent
        """
        super().__init__(sampling_rate)
        self.exp = exponent

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "pow"

    def process(self, data: DaArray) -> DaArray:
        # map_blocksを使わず、直接Daskの集約関数を使用
        return da.power(data, self.exp)  # type: ignore [unused-ignore]
Attributes
name = 'power' class-attribute instance-attribute
exp = exponent instance-attribute
Functions
__init__(sampling_rate, exponent)

Initialize power operation

Parameters

sampling_rate : float Sampling rate (Hz) exponent : float Power exponent

Source code in wandas/processing/stats.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, sampling_rate: float, exponent: float):
    """
    Initialize power operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    exponent : float
        Power exponent
    """
    super().__init__(sampling_rate)
    self.exp = exponent
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/stats.py
56
57
58
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "pow"
process(data)
Source code in wandas/processing/stats.py
60
61
62
def process(self, data: DaArray) -> DaArray:
    # map_blocksを使わず、直接Daskの集約関数を使用
    return da.power(data, self.exp)  # type: ignore [unused-ignore]

Sum

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Sum calculation

Source code in wandas/processing/stats.py
65
66
67
68
69
70
71
72
73
74
75
76
class Sum(AudioOperation[NDArrayReal, NDArrayReal]):
    """Sum calculation"""

    name = "sum"

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "sum"

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        return data.sum(axis=0, keepdims=True)
Attributes
name = 'sum' class-attribute instance-attribute
Functions
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/stats.py
70
71
72
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "sum"
process(data)
Source code in wandas/processing/stats.py
74
75
76
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    return data.sum(axis=0, keepdims=True)

Mean

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Mean calculation

Source code in wandas/processing/stats.py
79
80
81
82
83
84
85
86
87
88
89
90
class Mean(AudioOperation[NDArrayReal, NDArrayReal]):
    """Mean calculation"""

    name = "mean"

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "mean"

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        return data.mean(axis=0, keepdims=True)
Attributes
name = 'mean' class-attribute instance-attribute
Functions
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/stats.py
84
85
86
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "mean"
process(data)
Source code in wandas/processing/stats.py
88
89
90
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    return data.mean(axis=0, keepdims=True)

ChannelDifference

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Channel difference calculation operation

Source code in wandas/processing/stats.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class ChannelDifference(AudioOperation[NDArrayReal, NDArrayReal]):
    """Channel difference calculation operation"""

    name = "channel_difference"
    other_channel: int

    def __init__(self, sampling_rate: float, other_channel: int = 0):
        """
        Initialize channel difference calculation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        other_channel : int
            Channel to calculate difference with, default is 0
        """
        self.other_channel = other_channel
        super().__init__(sampling_rate, other_channel=other_channel)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "diff"

    def process(self, data: DaArray) -> DaArray:
        # map_blocksを使わず、直接Daskの集約関数を使用
        result = data - data[self.other_channel]
        return result
Attributes
name = 'channel_difference' class-attribute instance-attribute
other_channel = other_channel instance-attribute
Functions
__init__(sampling_rate, other_channel=0)

Initialize channel difference calculation

Parameters

sampling_rate : float Sampling rate (Hz) other_channel : int Channel to calculate difference with, default is 0

Source code in wandas/processing/stats.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
def __init__(self, sampling_rate: float, other_channel: int = 0):
    """
    Initialize channel difference calculation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    other_channel : int
        Channel to calculate difference with, default is 0
    """
    self.other_channel = other_channel
    super().__init__(sampling_rate, other_channel=other_channel)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/stats.py
113
114
115
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "diff"
process(data)
Source code in wandas/processing/stats.py
117
118
119
120
def process(self, data: DaArray) -> DaArray:
    # map_blocksを使わず、直接Daskの集約関数を使用
    result = data - data[self.other_channel]
    return result

Functions

Temporal Processing / 時間領域処理

Provides time-domain processing capabilities. 時間領域の処理機能を提供します。

wandas.processing.temporal

Attributes

logger = logging.getLogger(__name__) module-attribute

MIN_SOUND_LEVEL_POWER_RATIO = 1e-20 module-attribute

Classes

ReSampling

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Resampling operation

Source code in wandas/processing/temporal.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ReSampling(AudioOperation[NDArrayReal, NDArrayReal]):
    """Resampling operation"""

    name = "resampling"

    def __init__(self, sampling_rate: float, target_sr: float):
        """
        Initialize resampling operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        target_sampling_rate : float
            Target sampling rate (Hz)

        Raises
        ------
        ValueError
            If sampling_rate or target_sr is not positive
        """
        validate_sampling_rate(sampling_rate, "source sampling rate")
        validate_sampling_rate(target_sr, "target sampling rate")
        super().__init__(sampling_rate, target_sr=target_sr)
        self.target_sr = target_sr

    def get_metadata_updates(self) -> dict[str, Any]:
        """
        Update sampling rate to target sampling rate.

        Returns
        -------
        dict
            Metadata updates with new sampling rate

        Notes
        -----
        Resampling always produces output at target_sr, regardless of input
        sampling rate. All necessary parameters are provided at initialization.
        """
        return {"sampling_rate": self.target_sr}

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        # Calculate length after resampling
        ratio = float(self.target_sr) / float(self.sampling_rate)
        n_samples = int(np.ceil(input_shape[-1] * ratio))
        return (*input_shape[:-1], n_samples)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "rs"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for resampling operation"""
        logger.debug(f"Applying resampling to array with shape: {x.shape}")
        result: NDArrayReal = librosa.resample(x, orig_sr=self.sampling_rate, target_sr=self.target_sr)
        logger.debug(f"Resampling applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'resampling' class-attribute instance-attribute
target_sr = target_sr instance-attribute
Functions
__init__(sampling_rate, target_sr)

Initialize resampling operation

Parameters

sampling_rate : float Sampling rate (Hz) target_sampling_rate : float Target sampling rate (Hz)

Raises

ValueError If sampling_rate or target_sr is not positive

Source code in wandas/processing/temporal.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, sampling_rate: float, target_sr: float):
    """
    Initialize resampling operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    target_sampling_rate : float
        Target sampling rate (Hz)

    Raises
    ------
    ValueError
        If sampling_rate or target_sr is not positive
    """
    validate_sampling_rate(sampling_rate, "source sampling rate")
    validate_sampling_rate(target_sr, "target sampling rate")
    super().__init__(sampling_rate, target_sr=target_sr)
    self.target_sr = target_sr
get_metadata_updates()

Update sampling rate to target sampling rate.

Returns

dict Metadata updates with new sampling rate

Notes

Resampling always produces output at target_sr, regardless of input sampling rate. All necessary parameters are provided at initialization.

Source code in wandas/processing/temporal.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def get_metadata_updates(self) -> dict[str, Any]:
    """
    Update sampling rate to target sampling rate.

    Returns
    -------
    dict
        Metadata updates with new sampling rate

    Notes
    -----
    Resampling always produces output at target_sr, regardless of input
    sampling rate. All necessary parameters are provided at initialization.
    """
    return {"sampling_rate": self.target_sr}
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/temporal.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    # Calculate length after resampling
    ratio = float(self.target_sr) / float(self.sampling_rate)
    n_samples = int(np.ceil(input_shape[-1] * ratio))
    return (*input_shape[:-1], n_samples)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/temporal.py
81
82
83
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "rs"

Trim

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Trimming operation

Source code in wandas/processing/temporal.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class Trim(AudioOperation[NDArrayReal, NDArrayReal]):
    """Trimming operation"""

    name = "trim"

    def __init__(
        self,
        sampling_rate: float,
        start: float,
        end: float,
    ):
        """
        Initialize trimming operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        start : float
            Start time for trimming (seconds)
        end : float
            End time for trimming (seconds)
        """
        super().__init__(sampling_rate, start=start, end=end)
        self.start = start
        self.end = end
        self.start_sample = int(start * sampling_rate)
        self.end_sample = int(end * sampling_rate)
        logger.debug(f"Initialized Trim operation with start: {self.start}, end: {self.end}")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        # Calculate length after trimming
        # Exclude parts where there is no signal
        end_sample = min(self.end_sample, input_shape[-1])
        n_samples = end_sample - self.start_sample
        return (*input_shape[:-1], n_samples)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "trim"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for trimming operation"""
        logger.debug(f"Applying trim to array with shape: {x.shape}")
        # Apply trimming
        result = x[..., self.start_sample : self.end_sample]
        logger.debug(f"Trim applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'trim' class-attribute instance-attribute
start = start instance-attribute
end = end instance-attribute
start_sample = int(start * sampling_rate) instance-attribute
end_sample = int(end * sampling_rate) instance-attribute
Functions
__init__(sampling_rate, start, end)

Initialize trimming operation

Parameters

sampling_rate : float Sampling rate (Hz) start : float Start time for trimming (seconds) end : float End time for trimming (seconds)

Source code in wandas/processing/temporal.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def __init__(
    self,
    sampling_rate: float,
    start: float,
    end: float,
):
    """
    Initialize trimming operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    start : float
        Start time for trimming (seconds)
    end : float
        End time for trimming (seconds)
    """
    super().__init__(sampling_rate, start=start, end=end)
    self.start = start
    self.end = end
    self.start_sample = int(start * sampling_rate)
    self.end_sample = int(end * sampling_rate)
    logger.debug(f"Initialized Trim operation with start: {self.start}, end: {self.end}")
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/temporal.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    # Calculate length after trimming
    # Exclude parts where there is no signal
    end_sample = min(self.end_sample, input_shape[-1])
    n_samples = end_sample - self.start_sample
    return (*input_shape[:-1], n_samples)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/temporal.py
143
144
145
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "trim"

FixLength

Bases: AudioOperation[NDArrayReal, NDArrayReal]

信号の長さを指定された長さに調整する操作

Source code in wandas/processing/temporal.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class FixLength(AudioOperation[NDArrayReal, NDArrayReal]):
    """信号の長さを指定された長さに調整する操作"""

    name = "fix_length"

    def __init__(
        self,
        sampling_rate: float,
        length: int | None = None,
        duration: float | None = None,
    ):
        """
        Initialize fix length operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        length : Optional[int]
            Target length for fixing
        duration : Optional[float]
            Target length for fixing
        """
        if length is None:
            if duration is None:
                raise ValueError("Either length or duration must be provided.")
            else:
                length = int(duration * sampling_rate)
        self.target_length = length

        super().__init__(sampling_rate, target_length=self.target_length)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        return (*input_shape[:-1], self.target_length)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "fix"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for padding operation"""
        logger.debug(f"Applying padding to array with shape: {x.shape}")
        # Apply padding
        pad_width = self.target_length - x.shape[-1]
        if pad_width > 0:
            result = np.pad(x, ((0, 0), (0, pad_width)), mode="constant")
        else:
            result = x[..., : self.target_length]
        logger.debug(f"Padding applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'fix_length' class-attribute instance-attribute
target_length = length instance-attribute
Functions
__init__(sampling_rate, length=None, duration=None)

Initialize fix length operation

Parameters

sampling_rate : float Sampling rate (Hz) length : Optional[int] Target length for fixing duration : Optional[float] Target length for fixing

Source code in wandas/processing/temporal.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def __init__(
    self,
    sampling_rate: float,
    length: int | None = None,
    duration: float | None = None,
):
    """
    Initialize fix length operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    length : Optional[int]
        Target length for fixing
    duration : Optional[float]
        Target length for fixing
    """
    if length is None:
        if duration is None:
            raise ValueError("Either length or duration must be provided.")
        else:
            length = int(duration * sampling_rate)
    self.target_length = length

    super().__init__(sampling_rate, target_length=self.target_length)
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/temporal.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    return (*input_shape[:-1], self.target_length)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/temporal.py
204
205
206
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "fix"

RmsTrend

Bases: AudioOperation[NDArrayReal, NDArrayReal]

RMS calculation

Source code in wandas/processing/temporal.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class RmsTrend(AudioOperation[NDArrayReal, NDArrayReal]):
    """RMS calculation"""

    name = "rms_trend"
    frame_length: int
    hop_length: int
    Aw: bool

    def __init__(
        self,
        sampling_rate: float,
        frame_length: int = 2048,
        hop_length: int = 512,
        ref: list[float] | float = 1.0,
        dB: bool = False,  # noqa: N803
        Aw: bool = False,  # noqa: N803
    ) -> None:
        """
        Initialize RMS calculation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        frame_length : int
            Frame length, default is 2048
        hop_length : int
            Hop length, default is 512
        ref : Union[list[float], float]
            Reference value(s) for dB calculation
        dB : bool
            Whether to convert to decibels
        Aw : bool
            Whether to apply A-weighting before RMS calculation
        """
        self.frame_length = frame_length
        self.hop_length = hop_length
        self.dB = dB
        self.Aw = Aw
        self.ref = np.array(ref if isinstance(ref, list) else [ref])
        super().__init__(
            sampling_rate,
            frame_length=frame_length,
            hop_length=hop_length,
            dB=dB,
            Aw=Aw,
            ref=self.ref,
        )

    def get_metadata_updates(self) -> dict[str, Any]:
        """
        Update sampling rate based on hop length.

        Returns
        -------
        dict
            Metadata updates with new sampling rate based on hop length

        Notes
        -----
        The output sampling rate is determined by downsampling the input
        by hop_length. All necessary parameters are provided at initialization.
        """
        new_sr = self.sampling_rate / self.hop_length
        return {"sampling_rate": new_sr}

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels, frames)
        """
        n_frames = librosa.feature.rms(
            y=np.ones((1, input_shape[-1])),
            frame_length=self.frame_length,
            hop_length=self.hop_length,
        ).shape[-1]
        return (*input_shape[:-1], n_frames)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        return "RMS"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for RMS calculation"""
        logger.debug(f"Applying RMS to array with shape: {x.shape}")

        if self.Aw:
            # Apply A-weighting
            _x = A_weight(x, self.sampling_rate)
            if isinstance(_x, np.ndarray):
                # A_weightがタプルを返す場合、最初の要素を使用
                x = _x
            elif isinstance(_x, tuple):
                # Use the first element if A_weight returns a tuple
                x = _x[0]
            else:
                raise ValueError("A_weighting returned an unexpected type.")

        # Calculate RMS
        result: NDArrayReal = librosa.feature.rms(y=x, frame_length=self.frame_length, hop_length=self.hop_length)[
            ..., 0, :
        ]

        if self.dB:
            # Convert to dB
            result = 20 * np.log10(np.maximum(result / self.ref[..., np.newaxis], 1e-12))
        #
        logger.debug(f"RMS applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'rms_trend' class-attribute instance-attribute
frame_length = frame_length instance-attribute
hop_length = hop_length instance-attribute
dB = dB instance-attribute
Aw = Aw instance-attribute
ref = np.array(ref if isinstance(ref, list) else [ref]) instance-attribute
Functions
__init__(sampling_rate, frame_length=2048, hop_length=512, ref=1.0, dB=False, Aw=False)

Initialize RMS calculation

Parameters

sampling_rate : float Sampling rate (Hz) frame_length : int Frame length, default is 2048 hop_length : int Hop length, default is 512 ref : Union[list[float], float] Reference value(s) for dB calculation dB : bool Whether to convert to decibels Aw : bool Whether to apply A-weighting before RMS calculation

Source code in wandas/processing/temporal.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def __init__(
    self,
    sampling_rate: float,
    frame_length: int = 2048,
    hop_length: int = 512,
    ref: list[float] | float = 1.0,
    dB: bool = False,  # noqa: N803
    Aw: bool = False,  # noqa: N803
) -> None:
    """
    Initialize RMS calculation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    frame_length : int
        Frame length, default is 2048
    hop_length : int
        Hop length, default is 512
    ref : Union[list[float], float]
        Reference value(s) for dB calculation
    dB : bool
        Whether to convert to decibels
    Aw : bool
        Whether to apply A-weighting before RMS calculation
    """
    self.frame_length = frame_length
    self.hop_length = hop_length
    self.dB = dB
    self.Aw = Aw
    self.ref = np.array(ref if isinstance(ref, list) else [ref])
    super().__init__(
        sampling_rate,
        frame_length=frame_length,
        hop_length=hop_length,
        dB=dB,
        Aw=Aw,
        ref=self.ref,
    )
get_metadata_updates()

Update sampling rate based on hop length.

Returns

dict Metadata updates with new sampling rate based on hop length

Notes

The output sampling rate is determined by downsampling the input by hop_length. All necessary parameters are provided at initialization.

Source code in wandas/processing/temporal.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_metadata_updates(self) -> dict[str, Any]:
    """
    Update sampling rate based on hop length.

    Returns
    -------
    dict
        Metadata updates with new sampling rate based on hop length

    Notes
    -----
    The output sampling rate is determined by downsampling the input
    by hop_length. All necessary parameters are provided at initialization.
    """
    new_sr = self.sampling_rate / self.hop_length
    return {"sampling_rate": new_sr}
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels, frames)

Source code in wandas/processing/temporal.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels, frames)
    """
    n_frames = librosa.feature.rms(
        y=np.ones((1, input_shape[-1])),
        frame_length=self.frame_length,
        hop_length=self.hop_length,
    ).shape[-1]
    return (*input_shape[:-1], n_frames)
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/temporal.py
308
309
310
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    return "RMS"

SoundLevel

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Time-weighted RMS or sound level with frequency and time weighting.

Source code in wandas/processing/temporal.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class SoundLevel(AudioOperation[NDArrayReal, NDArrayReal]):
    """Time-weighted RMS or sound level with frequency and time weighting."""

    name = "sound_level"

    def __init__(
        self,
        sampling_rate: float,
        ref: list[float] | float = 1.0,
        freq_weighting: str | None = "Z",
        time_weighting: str = "Fast",
        dB: bool = False,  # noqa: N803
    ) -> None:
        validate_sampling_rate(sampling_rate)
        self.ref = np.atleast_1d(np.asarray(ref, dtype=float))
        if np.any(self.ref <= 0):
            raise ValueError(
                "Invalid sound level reference\n"
                f"  Got: {self.ref.tolist()}\n"
                "  Expected: Positive reference values\n"
                "Sound pressure level requires a positive reference pressure."
            )
        self.freq_weighting = self._normalize_freq_weighting(freq_weighting)
        self.time_weighting = self._normalize_time_weighting(time_weighting)
        self.dB = dB
        super().__init__(
            sampling_rate,
            ref=self.ref,
            freq_weighting=self.freq_weighting,
            time_weighting=self.time_weighting,
            dB=dB,
        )

    @staticmethod
    def _normalize_freq_weighting(freq_weighting: str | None) -> str:
        normalized = "Z" if freq_weighting is None else str(freq_weighting).upper()
        if normalized not in {"A", "C", "Z"}:
            raise ValueError(
                "Invalid frequency weighting\n"
                f"  Got: {freq_weighting!r}\n"
                "  Expected: 'A', 'C', or 'Z'\n"
                "Choose a supported IEC-style weighting curve before calculating sound level."
            )
        return normalized

    @staticmethod
    def _normalize_time_weighting(time_weighting: str) -> str:
        normalized = str(time_weighting).strip().upper()
        if normalized in {"F", "FAST"}:
            return "Fast"
        if normalized in {"S", "SLOW"}:
            return "Slow"
        raise ValueError(
            "Invalid time weighting\n"
            f"  Got: {time_weighting!r}\n"
            "  Expected: 'Fast' or 'Slow'\n"
            "Choose a supported sound level meter time constant before calculating sound level."
        )

    @property
    def time_constant(self) -> float:
        """Return the RC time constant in seconds."""
        return 0.125 if self.time_weighting == "Fast" else 1.0

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """Sound level keeps the same channel and sample dimensions."""
        return input_shape

    @staticmethod
    def _output_dtype(
        input_dtype: np.dtype[Any],
    ) -> np.dtype[np.float32] | np.dtype[np.float64]:
        """Return the floating output dtype for the given input dtype."""
        if np.dtype(input_dtype) == np.dtype(np.float32):
            return np.dtype(np.float32)
        return np.dtype(np.float64)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        if self.dB:
            return f"L{self.freq_weighting}{self.time_weighting[0]}"
        return f"{self.freq_weighting}{self.time_weighting[0]}RMS"

    def _reference_squared(self, n_channels: int) -> NDArrayReal:
        """Return squared reference pressure for each channel."""
        if self.ref.size == 1:
            ref = np.repeat(self.ref, n_channels)
        elif self.ref.size == n_channels:
            ref = self.ref
        else:
            raise ValueError(
                "Reference count mismatch\n"
                f"  Got: {self.ref.size} reference values for {n_channels} channels\n"
                "  Expected: One shared reference or one reference per channel\n"
                "Provide ref as a scalar or a list matching the number of channels."
            )
        return np.asarray(np.square(ref), dtype=np.float64)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for sound level calculation."""
        logger.debug(
            "Applying sound level to array with shape %s using %s/%s weighting",
            x.shape,
            self.freq_weighting,
            self.time_weighting,
        )
        output_dtype = self._output_dtype(x.dtype)
        weighted_input = x if x.dtype == np.float64 else np.asarray(x, dtype=np.float64)
        if self.freq_weighting == "Z":
            weighted = weighted_input
        else:
            weighted = frequency_weight(weighted_input, self.sampling_rate, curve=self.freq_weighting)
        squared = np.square(weighted)
        alpha = np.asarray(np.exp(-1.0 / (self.sampling_rate * self.time_constant)), dtype=np.float64).item()
        smoothed = lfilter([1.0 - alpha], [1.0, -alpha], squared, axis=-1)
        if self.dB:
            ref_squared_broadcast = self._reference_squared(smoothed.shape[0])[:, np.newaxis]
            result = 10.0 * np.log10(np.maximum(smoothed / ref_squared_broadcast, MIN_SOUND_LEVEL_POWER_RATIO))
        else:
            result = np.sqrt(smoothed)
        logger.debug(f"Sound level applied, returning result with shape: {result.shape}")
        return np.asarray(result, dtype=output_dtype)

    def process(self, data: DaskArray) -> DaskArray:
        """Execute sound level with floating output dtype metadata."""
        logger.debug("Adding delayed sound level operation to computation graph")
        wrapper = self._create_named_wrapper()
        delayed_result = delayed(wrapper, pure=self.pure)(data)
        output_shape = self.calculate_output_shape(data.shape)
        return da.from_delayed(delayed_result, shape=output_shape, dtype=self._output_dtype(data.dtype))
Attributes
name = 'sound_level' class-attribute instance-attribute
ref = np.atleast_1d(np.asarray(ref, dtype=float)) instance-attribute
freq_weighting = self._normalize_freq_weighting(freq_weighting) instance-attribute
time_weighting = self._normalize_time_weighting(time_weighting) instance-attribute
dB = dB instance-attribute
time_constant property

Return the RC time constant in seconds.

Functions
__init__(sampling_rate, ref=1.0, freq_weighting='Z', time_weighting='Fast', dB=False)
Source code in wandas/processing/temporal.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def __init__(
    self,
    sampling_rate: float,
    ref: list[float] | float = 1.0,
    freq_weighting: str | None = "Z",
    time_weighting: str = "Fast",
    dB: bool = False,  # noqa: N803
) -> None:
    validate_sampling_rate(sampling_rate)
    self.ref = np.atleast_1d(np.asarray(ref, dtype=float))
    if np.any(self.ref <= 0):
        raise ValueError(
            "Invalid sound level reference\n"
            f"  Got: {self.ref.tolist()}\n"
            "  Expected: Positive reference values\n"
            "Sound pressure level requires a positive reference pressure."
        )
    self.freq_weighting = self._normalize_freq_weighting(freq_weighting)
    self.time_weighting = self._normalize_time_weighting(time_weighting)
    self.dB = dB
    super().__init__(
        sampling_rate,
        ref=self.ref,
        freq_weighting=self.freq_weighting,
        time_weighting=self.time_weighting,
        dB=dB,
    )
calculate_output_shape(input_shape)

Sound level keeps the same channel and sample dimensions.

Source code in wandas/processing/temporal.py
405
406
407
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """Sound level keeps the same channel and sample dimensions."""
    return input_shape
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/temporal.py
418
419
420
421
422
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    if self.dB:
        return f"L{self.freq_weighting}{self.time_weighting[0]}"
    return f"{self.freq_weighting}{self.time_weighting[0]}RMS"
process(data)

Execute sound level with floating output dtype metadata.

Source code in wandas/processing/temporal.py
464
465
466
467
468
469
470
def process(self, data: DaskArray) -> DaskArray:
    """Execute sound level with floating output dtype metadata."""
    logger.debug("Adding delayed sound level operation to computation graph")
    wrapper = self._create_named_wrapper()
    delayed_result = delayed(wrapper, pure=self.pure)(data)
    output_shape = self.calculate_output_shape(data.shape)
    return da.from_delayed(delayed_result, shape=output_shape, dtype=self._output_dtype(data.dtype))

Functions