Skip to content

Processing Module / 処理モジュール

The wandas.processing module provides various processing capabilities for audio data. wandas.processing モジュールは、オーディオデータに対する様々な処理機能を提供します。

Base Processing / 基本処理

Provides basic processing operations. 基本的な処理操作を提供します。

wandas.processing.base

Attributes

logger = logging.getLogger(__name__) module-attribute

InputArrayType = TypeVar('InputArrayType', NDArrayReal, NDArrayComplex) module-attribute

OutputArrayType = TypeVar('OutputArrayType', NDArrayReal, NDArrayComplex) module-attribute

Classes

AudioOperation

Bases: Generic[InputArrayType, OutputArrayType]

Abstract base class for audio processing operations.

Source code in wandas/processing/base.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class AudioOperation(Generic[InputArrayType, OutputArrayType]):
    """Abstract base class for audio processing operations."""

    # Class variable: operation name
    name: ClassVar[str]

    # Optional attributes used by some subclasses (e.g., FFT)
    n_fft: int | None
    window: str

    def __init__(self, sampling_rate: float, *, pure: bool = True, **params: Any):
        """
        Initialize AudioOperation.

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        pure : bool, default=True
            Whether the operation is pure (deterministic with no side effects).
            When True, Dask can cache results for identical inputs.
            Set to False only if the operation has side effects or is non-deterministic.
        **params : Any
            Operation-specific parameters
        """
        self.sampling_rate = sampling_rate
        self.pure = pure
        self.params = params

        # Validate parameters during initialization
        self.validate_params()

        # Create processor function (lazy initialization possible)
        self._setup_processor()

        logger.debug(f"Initialized {self.__class__.__name__} operation with params: {params}")

    def validate_params(self) -> None:
        """Validate parameters (raises exception if invalid)"""

    def _setup_processor(self) -> None:
        """Set up processor function (implemented by subclasses)"""

    def get_metadata_updates(self) -> dict[str, Any]:
        """
        Get metadata updates to apply after processing.

        This method allows operations to specify how metadata should be
        updated after processing. By default, no metadata is updated.

        Returns
        -------
        dict
            Dictionary of metadata updates. Can include:
            - 'sampling_rate': New sampling rate (float)
            - Other metadata keys as needed

        Examples
        --------
        Return empty dict for operations that don't change metadata:

        >>> return {}

        Return new sampling rate for operations that resample:

        >>> return {"sampling_rate": self.target_sr}

        Notes
        -----
        This method is called by the framework after processing to update
        the frame metadata. Subclasses should override this method if they
        need to update metadata (e.g., changing sampling rate).

        Design principle: Operations should use parameters provided at
        initialization (via __init__). All necessary information should be
        available as instance variables.
        """
        return {}

    def get_display_name(self) -> str | None:
        """
        Get display name for the operation for use in channel labels.

        Returns ``_display`` if the subclass sets it, otherwise ``None``
        (which tells the framework to fall back to the ``name`` class
        variable).  Subclasses with dynamic display names can still
        override this method.
        """
        return getattr(self, "_display", None)

    def _process_array(self, x: InputArrayType) -> OutputArrayType:
        """Processing function (implemented by subclasses)"""
        # Default is no-op function
        raise NotImplementedError("Subclasses must implement this method.")

    def _create_named_wrapper(self) -> Any:
        """
        Create a named wrapper function for better Dask graph visualization.

        Returns
        -------
        callable
            A wrapper function with the operation name set as __name__.
        """

        def operation_wrapper(x: InputArrayType) -> OutputArrayType:
            return self._process_array(x)

        # Set the function name to the operation name for better visualization
        operation_wrapper.__name__ = self.name
        return operation_wrapper

    def _delayed(self, data: Any) -> Any:
        """Create a ``dask.delayed`` result for *data* using the named wrapper."""
        wrapper = self._create_named_wrapper()
        return delayed(wrapper, pure=self.pure)(data)

    def process_array(self, x: Any) -> Any:
        """
        Processing function wrapped with @dask.delayed.

        This method returns a Delayed object that can be computed later.
        The operation name is used in the Dask task graph for better visualization.

        Parameters
        ----------
        x : InputArrayType
            Input array to process.

        Returns
        -------
        dask.delayed.Delayed
            A Delayed object representing the computation.
        """
        logger.debug(f"Creating delayed operation on data with shape: {x.shape}")
        return self._delayed(x)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation.

        The default returns *input_shape* unchanged, which is correct for the
        majority of operations (filters, effects, weighting, etc.).
        Subclasses that alter the shape (e.g. FFT, STFT, resampling) **must**
        override this method.

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        return input_shape

    def process(self, data: DaArray) -> DaArray:
        """
        Execute operation and return result
        data shape is (channels, samples)
        """
        logger.debug("Adding delayed operation to computation graph")
        delayed_result = self._delayed(data)
        output_shape = self.calculate_output_shape(data.shape)
        return _da_from_delayed(delayed_result, shape=output_shape, dtype=data.dtype)
Attributes
name class-attribute
n_fft instance-attribute
window instance-attribute
sampling_rate = sampling_rate instance-attribute
pure = pure instance-attribute
params = params instance-attribute
Functions
__init__(sampling_rate, *, pure=True, **params)

Initialize AudioOperation.

Parameters

sampling_rate : float Sampling rate (Hz) pure : bool, default=True Whether the operation is pure (deterministic with no side effects). When True, Dask can cache results for identical inputs. Set to False only if the operation has side effects or is non-deterministic. **params : Any Operation-specific parameters

Source code in wandas/processing/base.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, sampling_rate: float, *, pure: bool = True, **params: Any):
    """
    Initialize AudioOperation.

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    pure : bool, default=True
        Whether the operation is pure (deterministic with no side effects).
        When True, Dask can cache results for identical inputs.
        Set to False only if the operation has side effects or is non-deterministic.
    **params : Any
        Operation-specific parameters
    """
    self.sampling_rate = sampling_rate
    self.pure = pure
    self.params = params

    # Validate parameters during initialization
    self.validate_params()

    # Create processor function (lazy initialization possible)
    self._setup_processor()

    logger.debug(f"Initialized {self.__class__.__name__} operation with params: {params}")
validate_params()

Validate parameters (raises exception if invalid)

Source code in wandas/processing/base.py
57
58
def validate_params(self) -> None:
    """Validate parameters (raises exception if invalid)"""
get_metadata_updates()

Get metadata updates to apply after processing.

This method allows operations to specify how metadata should be updated after processing. By default, no metadata is updated.

Returns

dict Dictionary of metadata updates. Can include: - 'sampling_rate': New sampling rate (float) - Other metadata keys as needed

Examples

Return empty dict for operations that don't change metadata:

return {}

Return new sampling rate for operations that resample:

return {"sampling_rate": self.target_sr}

Notes

This method is called by the framework after processing to update the frame metadata. Subclasses should override this method if they need to update metadata (e.g., changing sampling rate).

Design principle: Operations should use parameters provided at initialization (via init). All necessary information should be available as instance variables.

Source code in wandas/processing/base.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def get_metadata_updates(self) -> dict[str, Any]:
    """
    Get metadata updates to apply after processing.

    This method allows operations to specify how metadata should be
    updated after processing. By default, no metadata is updated.

    Returns
    -------
    dict
        Dictionary of metadata updates. Can include:
        - 'sampling_rate': New sampling rate (float)
        - Other metadata keys as needed

    Examples
    --------
    Return empty dict for operations that don't change metadata:

    >>> return {}

    Return new sampling rate for operations that resample:

    >>> return {"sampling_rate": self.target_sr}

    Notes
    -----
    This method is called by the framework after processing to update
    the frame metadata. Subclasses should override this method if they
    need to update metadata (e.g., changing sampling rate).

    Design principle: Operations should use parameters provided at
    initialization (via __init__). All necessary information should be
    available as instance variables.
    """
    return {}
get_display_name()

Get display name for the operation for use in channel labels.

Returns _display if the subclass sets it, otherwise None (which tells the framework to fall back to the name class variable). Subclasses with dynamic display names can still override this method.

Source code in wandas/processing/base.py
 99
100
101
102
103
104
105
106
107
108
def get_display_name(self) -> str | None:
    """
    Get display name for the operation for use in channel labels.

    Returns ``_display`` if the subclass sets it, otherwise ``None``
    (which tells the framework to fall back to the ``name`` class
    variable).  Subclasses with dynamic display names can still
    override this method.
    """
    return getattr(self, "_display", None)
process_array(x)

Processing function wrapped with @dask.delayed.

This method returns a Delayed object that can be computed later. The operation name is used in the Dask task graph for better visualization.

Parameters

x : InputArrayType Input array to process.

Returns

dask.delayed.Delayed A Delayed object representing the computation.

Source code in wandas/processing/base.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def process_array(self, x: Any) -> Any:
    """
    Processing function wrapped with @dask.delayed.

    This method returns a Delayed object that can be computed later.
    The operation name is used in the Dask task graph for better visualization.

    Parameters
    ----------
    x : InputArrayType
        Input array to process.

    Returns
    -------
    dask.delayed.Delayed
        A Delayed object representing the computation.
    """
    logger.debug(f"Creating delayed operation on data with shape: {x.shape}")
    return self._delayed(x)
calculate_output_shape(input_shape)

Calculate output data shape after operation.

The default returns input_shape unchanged, which is correct for the majority of operations (filters, effects, weighting, etc.). Subclasses that alter the shape (e.g. FFT, STFT, resampling) must override this method.

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/base.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation.

    The default returns *input_shape* unchanged, which is correct for the
    majority of operations (filters, effects, weighting, etc.).
    Subclasses that alter the shape (e.g. FFT, STFT, resampling) **must**
    override this method.

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    return input_shape
process(data)

Execute operation and return result data shape is (channels, samples)

Source code in wandas/processing/base.py
178
179
180
181
182
183
184
185
186
def process(self, data: DaArray) -> DaArray:
    """
    Execute operation and return result
    data shape is (channels, samples)
    """
    logger.debug("Adding delayed operation to computation graph")
    delayed_result = self._delayed(data)
    output_shape = self.calculate_output_shape(data.shape)
    return _da_from_delayed(delayed_result, shape=output_shape, dtype=data.dtype)

Functions

register_operation(operation_class)

Register a new operation type

Source code in wandas/processing/base.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def register_operation(operation_class: type) -> None:
    """Register a new operation type"""

    if not issubclass(operation_class, AudioOperation):
        raise TypeError("Strategy class must inherit from AudioOperation.")
    if inspect.isabstract(operation_class):
        raise TypeError("Cannot register abstract AudioOperation class.")

    existing = _OPERATION_REGISTRY.get(operation_class.name)
    if (
        existing is not None
        and existing.__module__ == operation_class.__module__
        and existing.__qualname__ == operation_class.__qualname__
    ):
        return

    _OPERATION_REGISTRY[operation_class.name] = operation_class

get_operation(name)

Get operation class by name

Source code in wandas/processing/base.py
212
213
214
215
216
def get_operation(name: str) -> type[AudioOperation[Any, Any]]:
    """Get operation class by name"""
    if name not in _OPERATION_REGISTRY:
        raise ValueError(f"Unknown operation type: {name}")
    return _OPERATION_REGISTRY[name]

create_operation(name, sampling_rate, **params)

Create operation instance from name and parameters

Source code in wandas/processing/base.py
219
220
221
222
def create_operation(name: str, sampling_rate: float, **params: Any) -> AudioOperation[Any, Any]:
    """Create operation instance from name and parameters"""
    operation_class = get_operation(name)
    return operation_class(sampling_rate, **params)

Effects / エフェクト

Provides audio effect processing. オーディオエフェクト処理を提供します。

wandas.processing.effects

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

HpssHarmonic

Bases: _HpssBase

HPSS Harmonic operation

Source code in wandas/processing/effects.py
35
36
37
38
39
40
class HpssHarmonic(_HpssBase):
    """HPSS Harmonic operation"""

    name = "hpss_harmonic"
    _extract_func = "harmonic"
    _display = "Hrm"
Attributes
name = 'hpss_harmonic' class-attribute instance-attribute

HpssPercussive

Bases: _HpssBase

HPSS Percussive operation

Source code in wandas/processing/effects.py
43
44
45
46
47
48
class HpssPercussive(_HpssBase):
    """HPSS Percussive operation"""

    name = "hpss_percussive"
    _extract_func = "percussive"
    _display = "Prc"
Attributes
name = 'hpss_percussive' class-attribute instance-attribute

Normalize

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Signal normalization operation using librosa.util.normalize

Source code in wandas/processing/effects.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class Normalize(AudioOperation[NDArrayReal, NDArrayReal]):
    """Signal normalization operation using librosa.util.normalize"""

    name = "normalize"
    _display = "norm"

    def __init__(
        self,
        sampling_rate: float,
        norm: float | None = np.inf,
        axis: int | None = -1,
        threshold: float | None = None,
        fill: bool | None = None,
    ):
        """
        Initialize normalization operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        norm : float or np.inf, default=np.inf
            Norm type. Supported values:
            - np.inf: Maximum absolute value normalization
            - -np.inf: Minimum absolute value normalization
            - 0: Pseudo L0 normalization (divide by number of non-zero elements)
            - float: Lp norm
            - None: No normalization
        axis : int or None, default=-1
            Axis along which to normalize.
            - -1: Normalize along time axis (each channel independently)
            - None: Global normalization across all axes
            - int: Normalize along specified axis
        threshold : float or None, optional
            Threshold below which values are considered zero.
            If None, no threshold is applied.
        fill : bool or None, optional
            Value to fill when the norm is zero.
            If None, the zero vector remains zero.

        Raises
        ------
        ValueError
            If norm parameter is invalid or threshold is negative
        """
        # Validate norm parameter
        if norm is not None and not isinstance(norm, int | float):
            raise ValueError(
                f"Invalid normalization method\n"
                f"  Got: {type(norm).__name__} ({norm})\n"
                f"  Expected: float, int, np.inf, -np.inf, or None\n"
                f"Norm parameter must be a numeric value or None.\n"
                f"Common values: np.inf (max norm), 2 (L2 norm),\n"
                f"1 (L1 norm), 0 (pseudo L0)"
            )

        # Validate that norm is non-negative (except for -np.inf which is valid)
        if norm is not None and norm < 0 and not np.isneginf(norm):
            raise ValueError(
                f"Invalid normalization method\n"
                f"  Got: {norm}\n"
                f"  Expected: Non-negative value, np.inf, -np.inf, or None\n"
                f"Norm parameter must be non-negative (except -np.inf for min norm).\n"
                f"Common values: np.inf (max norm), 2 (L2 norm),\n"
                f"1 (L1 norm), 0 (pseudo L0)"
            )

        # Validate threshold
        if threshold is not None and threshold < 0:
            raise ValueError(
                f"Invalid threshold for normalization\n"
                f"  Got: {threshold}\n"
                f"  Expected: Non-negative value or None\n"
                f"Threshold must be non-negative.\n"
                f"Typical values: 0.0 (no threshold), 1e-10 (small threshold)"
            )

        super().__init__(sampling_rate, norm=norm, axis=axis, threshold=threshold, fill=fill)
        self.norm = norm
        self.axis = axis
        self.threshold = threshold
        self.fill = fill
        logger.debug(
            f"Initialized Normalize operation with norm={norm}, axis={axis}, threshold={threshold}, fill={fill}"
        )

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Perform normalization processing"""
        logger.debug(f"Applying normalization to array with shape: {x.shape}, norm={self.norm}, axis={self.axis}")

        # Apply librosa.util.normalize
        result: NDArrayReal = librosa_util.normalize(
            x, norm=self.norm, axis=self.axis, threshold=self.threshold, fill=self.fill
        )

        logger.debug(f"Normalization applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'normalize' class-attribute instance-attribute
norm = norm instance-attribute
axis = axis instance-attribute
threshold = threshold instance-attribute
fill = fill instance-attribute
Functions
__init__(sampling_rate, norm=np.inf, axis=-1, threshold=None, fill=None)

Initialize normalization operation

Parameters

sampling_rate : float Sampling rate (Hz) norm : float or np.inf, default=np.inf Norm type. Supported values: - np.inf: Maximum absolute value normalization - -np.inf: Minimum absolute value normalization - 0: Pseudo L0 normalization (divide by number of non-zero elements) - float: Lp norm - None: No normalization axis : int or None, default=-1 Axis along which to normalize. - -1: Normalize along time axis (each channel independently) - None: Global normalization across all axes - int: Normalize along specified axis threshold : float or None, optional Threshold below which values are considered zero. If None, no threshold is applied. fill : bool or None, optional Value to fill when the norm is zero. If None, the zero vector remains zero.

Raises

ValueError If norm parameter is invalid or threshold is negative

Source code in wandas/processing/effects.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self,
    sampling_rate: float,
    norm: float | None = np.inf,
    axis: int | None = -1,
    threshold: float | None = None,
    fill: bool | None = None,
):
    """
    Initialize normalization operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    norm : float or np.inf, default=np.inf
        Norm type. Supported values:
        - np.inf: Maximum absolute value normalization
        - -np.inf: Minimum absolute value normalization
        - 0: Pseudo L0 normalization (divide by number of non-zero elements)
        - float: Lp norm
        - None: No normalization
    axis : int or None, default=-1
        Axis along which to normalize.
        - -1: Normalize along time axis (each channel independently)
        - None: Global normalization across all axes
        - int: Normalize along specified axis
    threshold : float or None, optional
        Threshold below which values are considered zero.
        If None, no threshold is applied.
    fill : bool or None, optional
        Value to fill when the norm is zero.
        If None, the zero vector remains zero.

    Raises
    ------
    ValueError
        If norm parameter is invalid or threshold is negative
    """
    # Validate norm parameter
    if norm is not None and not isinstance(norm, int | float):
        raise ValueError(
            f"Invalid normalization method\n"
            f"  Got: {type(norm).__name__} ({norm})\n"
            f"  Expected: float, int, np.inf, -np.inf, or None\n"
            f"Norm parameter must be a numeric value or None.\n"
            f"Common values: np.inf (max norm), 2 (L2 norm),\n"
            f"1 (L1 norm), 0 (pseudo L0)"
        )

    # Validate that norm is non-negative (except for -np.inf which is valid)
    if norm is not None and norm < 0 and not np.isneginf(norm):
        raise ValueError(
            f"Invalid normalization method\n"
            f"  Got: {norm}\n"
            f"  Expected: Non-negative value, np.inf, -np.inf, or None\n"
            f"Norm parameter must be non-negative (except -np.inf for min norm).\n"
            f"Common values: np.inf (max norm), 2 (L2 norm),\n"
            f"1 (L1 norm), 0 (pseudo L0)"
        )

    # Validate threshold
    if threshold is not None and threshold < 0:
        raise ValueError(
            f"Invalid threshold for normalization\n"
            f"  Got: {threshold}\n"
            f"  Expected: Non-negative value or None\n"
            f"Threshold must be non-negative.\n"
            f"Typical values: 0.0 (no threshold), 1e-10 (small threshold)"
        )

    super().__init__(sampling_rate, norm=norm, axis=axis, threshold=threshold, fill=fill)
    self.norm = norm
    self.axis = axis
    self.threshold = threshold
    self.fill = fill
    logger.debug(
        f"Initialized Normalize operation with norm={norm}, axis={axis}, threshold={threshold}, fill={fill}"
    )

RemoveDC

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Remove DC component (DC offset) from the signal.

This operation removes the DC component by subtracting the mean value from each channel, centering the signal around zero.

Source code in wandas/processing/effects.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class RemoveDC(AudioOperation[NDArrayReal, NDArrayReal]):
    """Remove DC component (DC offset) from the signal.

    This operation removes the DC component by subtracting the mean value
    from each channel, centering the signal around zero.
    """

    name = "remove_dc"
    _display = "dcRM"

    def __init__(self, sampling_rate: float):
        """Initialize DC removal operation.

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        super().__init__(sampling_rate)
        logger.debug("Initialized RemoveDC operation")

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Perform DC removal processing.

        Parameters
        ----------
        x : NDArrayReal
            Input signal array (channels, samples)

        Returns
        -------
        NDArrayReal
            Signal with DC component removed
        """
        logger.debug(f"Removing DC component from array with shape: {x.shape}")

        # Subtract mean along time axis (axis=1 for channel data)
        mean_values = x.mean(axis=-1, keepdims=True)
        result: NDArrayReal = x - mean_values

        logger.debug(f"DC removal applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'remove_dc' class-attribute instance-attribute
Functions
__init__(sampling_rate)

Initialize DC removal operation.

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/effects.py
160
161
162
163
164
165
166
167
168
169
def __init__(self, sampling_rate: float):
    """Initialize DC removal operation.

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    super().__init__(sampling_rate)
    logger.debug("Initialized RemoveDC operation")

AddWithSNR

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Addition operation considering SNR

Source code in wandas/processing/effects.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class AddWithSNR(AudioOperation[NDArrayReal, NDArrayReal]):
    """Addition operation considering SNR"""

    name = "add_with_snr"
    _display = "+SNR"

    def __init__(self, sampling_rate: float, other: DaArray, snr: float = 1.0):
        """
        Initialize addition operation considering SNR

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        other : DaArray
            Noise signal to add (channel-frame format)
        snr : float
            Signal-to-noise ratio (dB)
        """
        super().__init__(sampling_rate, other=other, snr=snr)

        self.other = other
        self.snr = snr
        logger.debug(f"Initialized AddWithSNR operation with SNR: {snr} dB")

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Perform addition processing considering SNR"""
        logger.debug(f"Applying SNR-based addition with shape: {x.shape}")
        other: NDArrayReal = self.other.compute()

        # Use multi-channel versions of calculate_rms and calculate_desired_noise_rms
        clean_rms = util.calculate_rms(x)
        other_rms = util.calculate_rms(other)

        # Adjust noise gain based on specified SNR (apply per channel)
        desired_noise_rms = util.calculate_desired_noise_rms(clean_rms, self.snr)

        # Apply gain per channel using broadcasting
        gain = desired_noise_rms / other_rms
        # Add adjusted noise to signal
        result: NDArrayReal = x + other * gain
        return result
Attributes
name = 'add_with_snr' class-attribute instance-attribute
other = other instance-attribute
snr = snr instance-attribute
Functions
__init__(sampling_rate, other, snr=1.0)

Initialize addition operation considering SNR

Parameters

sampling_rate : float Sampling rate (Hz) other : DaArray Noise signal to add (channel-frame format) snr : float Signal-to-noise ratio (dB)

Source code in wandas/processing/effects.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def __init__(self, sampling_rate: float, other: DaArray, snr: float = 1.0):
    """
    Initialize addition operation considering SNR

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    other : DaArray
        Noise signal to add (channel-frame format)
    snr : float
        Signal-to-noise ratio (dB)
    """
    super().__init__(sampling_rate, other=other, snr=snr)

    self.other = other
    self.snr = snr
    logger.debug(f"Initialized AddWithSNR operation with SNR: {snr} dB")

Fade

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Fade operation using a Tukey (tapered cosine) window.

This operation applies symmetric fade-in and fade-out with the same duration. The Tukey window alpha parameter is computed from the fade duration so that the tapered portion equals the requested fade length at each end.

Source code in wandas/processing/effects.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class Fade(AudioOperation[NDArrayReal, NDArrayReal]):
    """Fade operation using a Tukey (tapered cosine) window.

    This operation applies symmetric fade-in and fade-out with the same
    duration. The Tukey window alpha parameter is computed from the fade
    duration so that the tapered portion equals the requested fade length
    at each end.
    """

    name = "fade"
    _display = "fade"

    def __init__(self, sampling_rate: float, fade_ms: float = 50) -> None:
        self.fade_ms = float(fade_ms)
        # Precompute fade length in samples at construction time
        self.fade_len = round(self.fade_ms * float(sampling_rate) / 1000.0)
        super().__init__(sampling_rate, fade_ms=fade_ms)

    def validate_params(self) -> None:
        if self.fade_ms < 0:
            raise ValueError("fade_ms must be non-negative")

    @staticmethod
    def calculate_tukey_alpha(fade_len: int, n_samples: int) -> float:
        """Calculate Tukey window alpha parameter from fade length.

        The alpha parameter determines what fraction of the window is tapered.
        For symmetric fade-in/fade-out, alpha = 2 * fade_len / n_samples ensures
        that each side's taper has exactly fade_len samples.

        Parameters
        ----------
        fade_len : int
            Desired fade length in samples for each end (in and out).
        n_samples : int
            Total number of samples in the signal.

        Returns
        -------
        float
            Alpha parameter for scipy.signal.windows.tukey, clamped to [0, 1].

        Examples
        --------
        >>> Fade.calculate_tukey_alpha(fade_len=20, n_samples=200)
        0.2
        >>> Fade.calculate_tukey_alpha(fade_len=100, n_samples=100)
        1.0
        """
        alpha = float(2 * fade_len) / float(n_samples)
        return min(1.0, alpha)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        logger.debug(f"Applying Tukey Fade to array with shape: {x.shape}")

        arr = x
        if arr.ndim == 1:
            arr = arr.reshape(1, -1)

        n_samples = int(arr.shape[-1])

        # If no fade requested, return input
        if self.fade_len <= 0:
            return arr

        if 2 * self.fade_len >= n_samples:
            raise ValueError("Fade length too long: 2*fade_ms must be less than signal length")

        # Calculate Tukey window alpha parameter
        alpha = self.calculate_tukey_alpha(self.fade_len, n_samples)

        # Create tukey window (numpy) and apply
        env = sp_windows.tukey(n_samples, alpha=alpha)

        result: NDArrayReal = arr * env[None, :]
        logger.debug("Tukey fade applied")
        return result
Attributes
name = 'fade' class-attribute instance-attribute
fade_ms = float(fade_ms) instance-attribute
fade_len = round(self.fade_ms * float(sampling_rate) / 1000.0) instance-attribute
Functions
__init__(sampling_rate, fade_ms=50)
Source code in wandas/processing/effects.py
250
251
252
253
254
def __init__(self, sampling_rate: float, fade_ms: float = 50) -> None:
    self.fade_ms = float(fade_ms)
    # Precompute fade length in samples at construction time
    self.fade_len = round(self.fade_ms * float(sampling_rate) / 1000.0)
    super().__init__(sampling_rate, fade_ms=fade_ms)
validate_params()
Source code in wandas/processing/effects.py
256
257
258
def validate_params(self) -> None:
    if self.fade_ms < 0:
        raise ValueError("fade_ms must be non-negative")
calculate_tukey_alpha(fade_len, n_samples) staticmethod

Calculate Tukey window alpha parameter from fade length.

The alpha parameter determines what fraction of the window is tapered. For symmetric fade-in/fade-out, alpha = 2 * fade_len / n_samples ensures that each side's taper has exactly fade_len samples.

Parameters

fade_len : int Desired fade length in samples for each end (in and out). n_samples : int Total number of samples in the signal.

Returns

float Alpha parameter for scipy.signal.windows.tukey, clamped to [0, 1].

Examples

Fade.calculate_tukey_alpha(fade_len=20, n_samples=200) 0.2 Fade.calculate_tukey_alpha(fade_len=100, n_samples=100) 1.0

Source code in wandas/processing/effects.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@staticmethod
def calculate_tukey_alpha(fade_len: int, n_samples: int) -> float:
    """Calculate Tukey window alpha parameter from fade length.

    The alpha parameter determines what fraction of the window is tapered.
    For symmetric fade-in/fade-out, alpha = 2 * fade_len / n_samples ensures
    that each side's taper has exactly fade_len samples.

    Parameters
    ----------
    fade_len : int
        Desired fade length in samples for each end (in and out).
    n_samples : int
        Total number of samples in the signal.

    Returns
    -------
    float
        Alpha parameter for scipy.signal.windows.tukey, clamped to [0, 1].

    Examples
    --------
    >>> Fade.calculate_tukey_alpha(fade_len=20, n_samples=200)
    0.2
    >>> Fade.calculate_tukey_alpha(fade_len=100, n_samples=100)
    1.0
    """
    alpha = float(2 * fade_len) / float(n_samples)
    return min(1.0, alpha)

Functions

Modules

Filters / フィルター

Provides various audio filter processing. 様々なオーディオフィルター処理を提供します。

wandas.processing.filters

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

HighPassFilter

Bases: _ButterworthFilter

High-pass filter operation

Source code in wandas/processing/filters.py
74
75
76
77
78
79
class HighPassFilter(_ButterworthFilter):
    """High-pass filter operation"""

    name = "highpass_filter"
    _btype = "high"
    _display = "hpf"
Attributes
name = 'highpass_filter' class-attribute instance-attribute

LowPassFilter

Bases: _ButterworthFilter

Low-pass filter operation

Source code in wandas/processing/filters.py
82
83
84
85
86
87
class LowPassFilter(_ButterworthFilter):
    """Low-pass filter operation"""

    name = "lowpass_filter"
    _btype = "low"
    _display = "lpf"
Attributes
name = 'lowpass_filter' class-attribute instance-attribute

BandPassFilter

Bases: _ButterworthFilter

Band-pass filter operation

Source code in wandas/processing/filters.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class BandPassFilter(_ButterworthFilter):
    """Band-pass filter operation"""

    name = "bandpass_filter"
    _btype = "band"
    _display = "bpf"

    def __init__(
        self,
        sampling_rate: float,
        low_cutoff: float,
        high_cutoff: float,
        order: int = 4,
    ):
        """
        Initialize band-pass filter

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        low_cutoff : float
            Lower cutoff frequency (Hz). Must be between 0 and Nyquist frequency.
        high_cutoff : float
            Higher cutoff frequency (Hz). Must be between 0 and Nyquist frequency
            and greater than low_cutoff.
        order : int, optional
            Filter order, default is 4

        Raises
        ------
        ValueError
            If either cutoff frequency is not within valid range (0 < cutoff < Nyquist),
            or if low_cutoff >= high_cutoff
        """
        self.low_cutoff = low_cutoff
        self.high_cutoff = high_cutoff
        self.order = order
        # Skip single-cutoff _ButterworthFilter.__init__
        AudioOperation.__init__(self, sampling_rate, low_cutoff=low_cutoff, high_cutoff=high_cutoff, order=order)

    def validate_params(self) -> None:
        """Validate parameters"""
        _validate_cutoff(self.low_cutoff, self.sampling_rate, "Lower cutoff")
        _validate_cutoff(self.high_cutoff, self.sampling_rate, "Higher cutoff")
        if self.low_cutoff >= self.high_cutoff:
            raise ValueError(
                f"Invalid bandpass filter cutoff frequencies\n"
                f"  Lower cutoff: {self.low_cutoff} Hz\n"
                f"  Higher cutoff: {self.high_cutoff} Hz\n"
                f"  Problem: Lower cutoff must be less than higher cutoff\n"
                f"A bandpass filter passes frequencies between low and high\n"
                f"  cutoffs.\n"
                f"Ensure low_cutoff < high_cutoff\n"
                f"  (e.g., low_cutoff=100, high_cutoff=1000)"
            )

    def _setup_processor(self) -> None:
        """Set up band-pass filter processor"""
        nyquist = 0.5 * self.sampling_rate
        low_normal_cutoff = self.low_cutoff / nyquist
        high_normal_cutoff = self.high_cutoff / nyquist

        # Precompute and save filter coefficients
        self.b, self.a = signal.butter(self.order, [low_normal_cutoff, high_normal_cutoff], btype="band")
        logger.debug(f"Bandpass filter coefficients calculated: b={self.b}, a={self.a}")
Attributes
name = 'bandpass_filter' class-attribute instance-attribute
low_cutoff = low_cutoff instance-attribute
high_cutoff = high_cutoff instance-attribute
order = order instance-attribute
Functions
__init__(sampling_rate, low_cutoff, high_cutoff, order=4)

Initialize band-pass filter

Parameters

sampling_rate : float Sampling rate (Hz) low_cutoff : float Lower cutoff frequency (Hz). Must be between 0 and Nyquist frequency. high_cutoff : float Higher cutoff frequency (Hz). Must be between 0 and Nyquist frequency and greater than low_cutoff. order : int, optional Filter order, default is 4

Raises

ValueError If either cutoff frequency is not within valid range (0 < cutoff < Nyquist), or if low_cutoff >= high_cutoff

Source code in wandas/processing/filters.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def __init__(
    self,
    sampling_rate: float,
    low_cutoff: float,
    high_cutoff: float,
    order: int = 4,
):
    """
    Initialize band-pass filter

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    low_cutoff : float
        Lower cutoff frequency (Hz). Must be between 0 and Nyquist frequency.
    high_cutoff : float
        Higher cutoff frequency (Hz). Must be between 0 and Nyquist frequency
        and greater than low_cutoff.
    order : int, optional
        Filter order, default is 4

    Raises
    ------
    ValueError
        If either cutoff frequency is not within valid range (0 < cutoff < Nyquist),
        or if low_cutoff >= high_cutoff
    """
    self.low_cutoff = low_cutoff
    self.high_cutoff = high_cutoff
    self.order = order
    # Skip single-cutoff _ButterworthFilter.__init__
    AudioOperation.__init__(self, sampling_rate, low_cutoff=low_cutoff, high_cutoff=high_cutoff, order=order)
validate_params()

Validate parameters

Source code in wandas/processing/filters.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def validate_params(self) -> None:
    """Validate parameters"""
    _validate_cutoff(self.low_cutoff, self.sampling_rate, "Lower cutoff")
    _validate_cutoff(self.high_cutoff, self.sampling_rate, "Higher cutoff")
    if self.low_cutoff >= self.high_cutoff:
        raise ValueError(
            f"Invalid bandpass filter cutoff frequencies\n"
            f"  Lower cutoff: {self.low_cutoff} Hz\n"
            f"  Higher cutoff: {self.high_cutoff} Hz\n"
            f"  Problem: Lower cutoff must be less than higher cutoff\n"
            f"A bandpass filter passes frequencies between low and high\n"
            f"  cutoffs.\n"
            f"Ensure low_cutoff < high_cutoff\n"
            f"  (e.g., low_cutoff=100, high_cutoff=1000)"
        )

AWeighting

Bases: AudioOperation[NDArrayReal, NDArrayReal]

A-weighting filter operation

Source code in wandas/processing/filters.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class AWeighting(AudioOperation[NDArrayReal, NDArrayReal]):
    """A-weighting filter operation"""

    name = "a_weighting"
    _display = "Aw"

    def __init__(self, sampling_rate: float):
        """
        Initialize A-weighting filter

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        super().__init__(sampling_rate)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for A-weighting filter"""
        logger.debug(f"Applying A-weighting to array with shape: {x.shape}")
        result = A_weight(x, self.sampling_rate)

        # Handle case where A_weight returns a tuple
        if isinstance(result, tuple):
            # Use the first element of the tuple
            result = result[0]

        logger.debug(f"A-weighting applied, returning result with shape: {result.shape}")
        return np.array(result)
Attributes
name = 'a_weighting' class-attribute instance-attribute
Functions
__init__(sampling_rate)

Initialize A-weighting filter

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/filters.py
164
165
166
167
168
169
170
171
172
173
def __init__(self, sampling_rate: float):
    """
    Initialize A-weighting filter

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    super().__init__(sampling_rate)

Functions

Spectral Processing / スペクトル処理

Provides spectral analysis and processing capabilities. スペクトル解析と処理機能を提供します。

wandas.processing.spectral

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

FFT

Bases: AudioOperation[NDArrayReal, NDArrayComplex]

FFT (Fast Fourier Transform) operation

Source code in wandas/processing/spectral.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class FFT(AudioOperation[NDArrayReal, NDArrayComplex]):
    """FFT (Fast Fourier Transform) operation"""

    name = "fft"
    _display = "FFT"
    n_fft: int | None
    window: str

    def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
        """
        Initialize FFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int, optional
            FFT size, default is None (determined by input size)
        window : str, optional
            Window function type, default is 'hann'

        Raises
        ------
        ValueError
            If n_fft is not a positive integer
        """
        # Validate n_fft parameter
        if n_fft is not None and n_fft <= 0:
            raise ValueError(
                f"Invalid FFT size\n"
                f"  Got: {n_fft}\n"
                f"  Expected: Positive integer > 0\n"
                f"FFT size must be a positive integer.\n"
                f"Common values: 512, 1024, 2048, 4096,\n"
                f"8192 (powers of 2 are most efficient)"
            )

        self.n_fft = n_fft
        self.window = window
        super().__init__(sampling_rate, n_fft=n_fft, window=window)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after the operation.

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples).

        Returns
        -------
        tuple
            Output data shape (channels, freqs).
        """
        n_freqs = self.n_fft // 2 + 1 if self.n_fft else input_shape[-1] // 2 + 1
        return (*input_shape[:-1], n_freqs)

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Apply FFT to the input array."""
        from scipy.signal import get_window

        if self.n_fft is not None and x.shape[-1] > self.n_fft:
            # If n_fft is specified and input length exceeds it, truncate
            x = x[..., : self.n_fft]

        win = get_window(self.window, x.shape[-1])
        x = x * win
        result: NDArrayComplex = np.fft.rfft(x, n=self.n_fft, axis=-1)
        result[..., 1:-1] *= 2.0
        # Window function scaling correction
        scaling_factor = np.sum(win)
        result = result / scaling_factor
        return result
Attributes
name = 'fft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
window = window instance-attribute
Functions
__init__(sampling_rate, n_fft=None, window='hann')

Initialize FFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int, optional FFT size, default is None (determined by input size) window : str, optional Window function type, default is 'hann'

Raises

ValueError If n_fft is not a positive integer

Source code in wandas/processing/spectral.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
    """
    Initialize FFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int, optional
        FFT size, default is None (determined by input size)
    window : str, optional
        Window function type, default is 'hann'

    Raises
    ------
    ValueError
        If n_fft is not a positive integer
    """
    # Validate n_fft parameter
    if n_fft is not None and n_fft <= 0:
        raise ValueError(
            f"Invalid FFT size\n"
            f"  Got: {n_fft}\n"
            f"  Expected: Positive integer > 0\n"
            f"FFT size must be a positive integer.\n"
            f"Common values: 512, 1024, 2048, 4096,\n"
            f"8192 (powers of 2 are most efficient)"
        )

    self.n_fft = n_fft
    self.window = window
    super().__init__(sampling_rate, n_fft=n_fft, window=window)
calculate_output_shape(input_shape)

Calculate output data shape after the operation.

Parameters

input_shape : tuple Input data shape (channels, samples).

Returns

tuple Output data shape (channels, freqs).

Source code in wandas/processing/spectral.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after the operation.

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples).

    Returns
    -------
    tuple
        Output data shape (channels, freqs).
    """
    n_freqs = self.n_fft // 2 + 1 if self.n_fft else input_shape[-1] // 2 + 1
    return (*input_shape[:-1], n_freqs)

IFFT

Bases: AudioOperation[NDArrayComplex, NDArrayReal]

IFFT (Inverse Fast Fourier Transform) operation

Source code in wandas/processing/spectral.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class IFFT(AudioOperation[NDArrayComplex, NDArrayReal]):
    """IFFT (Inverse Fast Fourier Transform) operation"""

    name = "ifft"
    _display = "iFFT"
    n_fft: int | None
    window: str

    def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
        """
        Initialize IFFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : Optional[int], optional
            IFFT size, default is None (determined based on input size)
        window : str, optional
            Window function type, default is 'hann'
        """
        self.n_fft = n_fft
        self.window = window
        super().__init__(sampling_rate, n_fft=n_fft, window=window)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, freqs)

        Returns
        -------
        tuple
            Output data shape (channels, samples)
        """
        n_samples = 2 * (input_shape[-1] - 1) if self.n_fft is None else self.n_fft
        return (*input_shape[:-1], n_samples)

    def _process_array(self, x: NDArrayComplex) -> NDArrayReal:
        """Create processor function for IFFT operation"""
        logger.debug(f"Applying IFFT to array with shape: {x.shape}")

        # Restore frequency component scaling (remove the 2.0 multiplier applied in FFT)
        _x = x.copy()
        _x[..., 1:-1] /= 2.0

        # Execute IFFT
        result: NDArrayReal = np.fft.irfft(_x, n=self.n_fft, axis=-1)

        # Window function correction (inverse of FFT operation)
        from scipy.signal import get_window

        win = get_window(self.window, result.shape[-1])

        # Correct the FFT window function scaling
        scaling_factor = np.sum(win) / result.shape[-1]
        result = result / scaling_factor

        logger.debug(f"IFFT applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'ifft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
window = window instance-attribute
Functions
__init__(sampling_rate, n_fft=None, window='hann')

Initialize IFFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : Optional[int], optional IFFT size, default is None (determined based on input size) window : str, optional Window function type, default is 'hann'

Source code in wandas/processing/spectral.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def __init__(self, sampling_rate: float, n_fft: int | None = None, window: str = "hann"):
    """
    Initialize IFFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : Optional[int], optional
        IFFT size, default is None (determined based on input size)
    window : str, optional
        Window function type, default is 'hann'
    """
    self.n_fft = n_fft
    self.window = window
    super().__init__(sampling_rate, n_fft=n_fft, window=window)
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, freqs)

Returns

tuple Output data shape (channels, samples)

Source code in wandas/processing/spectral.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, freqs)

    Returns
    -------
    tuple
        Output data shape (channels, samples)
    """
    n_samples = 2 * (input_shape[-1] - 1) if self.n_fft is None else self.n_fft
    return (*input_shape[:-1], n_samples)

STFT

Bases: AudioOperation[NDArrayReal, NDArrayComplex]

Short-Time Fourier Transform operation

Source code in wandas/processing/spectral.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class STFT(AudioOperation[NDArrayReal, NDArrayComplex]):
    """Short-Time Fourier Transform operation"""

    name = "stft"
    _display = "STFT"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
    ):
        """
        Initialize STFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window type, default is 'hann'

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "STFT")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.window = window

        self.SFT = ShortTimeFFT(
            win=get_window(window, self.win_length),
            hop=self.hop_length,
            fs=sampling_rate,
            mfft=self.n_fft,
            scale_to="magnitude",
        )
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            win_length=self.win_length,
            hop_length=self.hop_length,
            window=window,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        n_samples = input_shape[-1]
        n_f = len(self.SFT.f)
        n_t = len(self.SFT.t(n_samples))
        return (input_shape[0], n_f, n_t)

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Apply SciPy STFT processing to multiple channels at once"""
        logger.debug(f"Applying SciPy STFT to array with shape: {x.shape}")

        # Convert 1D input to 2D
        if x.ndim == 1:
            x = x.reshape(1, -1)

        # Apply STFT to all channels at once
        result: NDArrayComplex = self.SFT.stft(x)
        result[..., 1:-1, :] *= 2.0
        logger.debug(f"SciPy STFT applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'stft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
window = window instance-attribute
SFT = ShortTimeFFT(win=(get_window(window, self.win_length)), hop=(self.hop_length), fs=sampling_rate, mfft=(self.n_fft), scale_to='magnitude') instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann')

Initialize STFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window type, default is 'hann'

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
):
    """
    Initialize STFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window type, default is 'hann'

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "STFT")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.window = window

    self.SFT = ShortTimeFFT(
        win=get_window(window, self.win_length),
        hop=self.hop_length,
        fs=sampling_rate,
        mfft=self.n_fft,
        scale_to="magnitude",
    )
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        win_length=self.win_length,
        hop_length=self.hop_length,
        window=window,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/spectral.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    n_samples = input_shape[-1]
    n_f = len(self.SFT.f)
    n_t = len(self.SFT.t(n_samples))
    return (input_shape[0], n_f, n_t)

ISTFT

Bases: AudioOperation[NDArrayComplex, NDArrayReal]

Inverse Short-Time Fourier Transform operation

Source code in wandas/processing/spectral.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class ISTFT(AudioOperation[NDArrayComplex, NDArrayReal]):
    """Inverse Short-Time Fourier Transform operation"""

    name = "istft"
    _display = "iSTFT"

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        length: int | None = None,
    ):
        """
        Initialize ISTFT operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str
            Window type, default is 'hann'
        length : int, optional
            Length of output signal. Default is None (determined from input)

        Raises
        ------
        ValueError
            If n_fft is not positive, win_length > n_fft, or hop_length is invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "ISTFT")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.window = window
        self.length = length

        # Instantiate ShortTimeFFT for ISTFT calculation
        self.SFT = ShortTimeFFT(
            win=get_window(window, self.win_length),
            hop=self.hop_length,
            fs=sampling_rate,
            mfft=self.n_fft,
            scale_to="magnitude",  # Consistent scaling with STFT
        )

        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            win_length=self.win_length,
            hop_length=self.hop_length,
            window=window,
            length=length,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after ISTFT operation.

        Uses the SciPy ShortTimeFFT calculation formula to compute the expected
        output length based on the input spectrogram dimensions and output range
        parameters (k0, k1).

        Parameters
        ----------
        input_shape : tuple
            Input spectrogram shape (channels, n_freqs, n_frames)
            where n_freqs = n_fft // 2 + 1 and n_frames is the number of time frames.

        Returns
        -------
        tuple
            Output shape (channels, output_samples) where output_samples is the
            reconstructed signal length determined by the output range [k0, k1).

        Notes
        -----
        The calculation follows SciPy's ShortTimeFFT.istft() implementation.
        When k1 is None (default), the maximum reconstructible signal length is
        computed as:

        .. math::

            q_{max} = n_{frames} + p_{min}

            k_{max} = (q_{max} - 1) \\cdot hop + m_{num} - m_{num\\_mid}

        The output length is then:

        .. math::

            output\\_samples = k_1 - k_0

        where k0 defaults to 0 and k1 defaults to k_max.

        Parameters that affect the calculation:
        - n_frames: number of time frames in the STFT
        - p_min: minimum frame index (ShortTimeFFT property)
        - hop: hop length (samples between frames)
        - m_num: window length
        - m_num_mid: window midpoint position
        - self.length: optional length override (if set, limits output)

        References
        ----------
        - SciPy ShortTimeFFT.istft:
          https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.istft.html
        - SciPy Source: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
        """
        n_channels = input_shape[0]
        n_frames = input_shape[-1]  # time_frames

        # Follow SciPy ShortTimeFFT formula
        # See: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
        q_max = n_frames + self.SFT.p_min
        k_max = (q_max - 1) * self.SFT.hop + self.SFT.m_num - self.SFT.m_num_mid

        # Default parameters: k0=0, k1=None (which becomes k_max)
        # The output length is k1 - k0 = k_max - 0 = k_max
        k0 = 0
        k1 = k_max

        # If self.length is specified, it acts as an override to limit the output
        if self.length is not None:
            k1 = min(self.length, k1)

        output_samples = k1 - k0

        return (n_channels, output_samples)

    def _process_array(self, x: NDArrayComplex) -> NDArrayReal:
        """
        Apply SciPy ISTFT processing to multiple channels at once using ShortTimeFFT"""
        logger.debug(f"Applying SciPy ISTFT (ShortTimeFFT) to array with shape: {x.shape}")

        # Convert 2D input to 3D (assume single channel)
        if x.ndim == 2:
            x = x.reshape(1, *x.shape)

        # Adjust scaling back if STFT applied factor of 2
        _x = np.copy(x)
        _x[..., 1:-1, :] /= 2.0

        # Apply ISTFT using the ShortTimeFFT instance
        result: NDArrayReal = self.SFT.istft(_x)

        # Trim to desired length if specified
        if self.length is not None:
            result = result[..., : self.length]

        logger.debug(f"ShortTimeFFT applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'istft' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
window = window instance-attribute
length = length instance-attribute
SFT = ShortTimeFFT(win=(get_window(window, self.win_length)), hop=(self.hop_length), fs=sampling_rate, mfft=(self.n_fft), scale_to='magnitude') instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', length=None)

Initialize ISTFT operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str Window type, default is 'hann' length : int, optional Length of output signal. Default is None (determined from input)

Raises

ValueError If n_fft is not positive, win_length > n_fft, or hop_length is invalid

Source code in wandas/processing/spectral.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    length: int | None = None,
):
    """
    Initialize ISTFT operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str
        Window type, default is 'hann'
    length : int, optional
        Length of output signal. Default is None (determined from input)

    Raises
    ------
    ValueError
        If n_fft is not positive, win_length > n_fft, or hop_length is invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "ISTFT")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.window = window
    self.length = length

    # Instantiate ShortTimeFFT for ISTFT calculation
    self.SFT = ShortTimeFFT(
        win=get_window(window, self.win_length),
        hop=self.hop_length,
        fs=sampling_rate,
        mfft=self.n_fft,
        scale_to="magnitude",  # Consistent scaling with STFT
    )

    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        win_length=self.win_length,
        hop_length=self.hop_length,
        window=window,
        length=length,
    )
calculate_output_shape(input_shape)

Calculate output data shape after ISTFT operation.

Uses the SciPy ShortTimeFFT calculation formula to compute the expected output length based on the input spectrogram dimensions and output range parameters (k0, k1).

Parameters

input_shape : tuple Input spectrogram shape (channels, n_freqs, n_frames) where n_freqs = n_fft // 2 + 1 and n_frames is the number of time frames.

Returns

tuple Output shape (channels, output_samples) where output_samples is the reconstructed signal length determined by the output range [k0, k1).

Notes

The calculation follows SciPy's ShortTimeFFT.istft() implementation. When k1 is None (default), the maximum reconstructible signal length is computed as:

.. math::

q_{max} = n_{frames} + p_{min}

k_{max} = (q_{max} - 1) \cdot hop + m_{num} - m_{num\_mid}

The output length is then:

.. math::

output\_samples = k_1 - k_0

where k0 defaults to 0 and k1 defaults to k_max.

Parameters that affect the calculation: - n_frames: number of time frames in the STFT - p_min: minimum frame index (ShortTimeFFT property) - hop: hop length (samples between frames) - m_num: window length - m_num_mid: window midpoint position - self.length: optional length override (if set, limits output)

References
  • SciPy ShortTimeFFT.istft: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.istft.html
  • SciPy Source: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
Source code in wandas/processing/spectral.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after ISTFT operation.

    Uses the SciPy ShortTimeFFT calculation formula to compute the expected
    output length based on the input spectrogram dimensions and output range
    parameters (k0, k1).

    Parameters
    ----------
    input_shape : tuple
        Input spectrogram shape (channels, n_freqs, n_frames)
        where n_freqs = n_fft // 2 + 1 and n_frames is the number of time frames.

    Returns
    -------
    tuple
        Output shape (channels, output_samples) where output_samples is the
        reconstructed signal length determined by the output range [k0, k1).

    Notes
    -----
    The calculation follows SciPy's ShortTimeFFT.istft() implementation.
    When k1 is None (default), the maximum reconstructible signal length is
    computed as:

    .. math::

        q_{max} = n_{frames} + p_{min}

        k_{max} = (q_{max} - 1) \\cdot hop + m_{num} - m_{num\\_mid}

    The output length is then:

    .. math::

        output\\_samples = k_1 - k_0

    where k0 defaults to 0 and k1 defaults to k_max.

    Parameters that affect the calculation:
    - n_frames: number of time frames in the STFT
    - p_min: minimum frame index (ShortTimeFFT property)
    - hop: hop length (samples between frames)
    - m_num: window length
    - m_num_mid: window midpoint position
    - self.length: optional length override (if set, limits output)

    References
    ----------
    - SciPy ShortTimeFFT.istft:
      https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.istft.html
    - SciPy Source: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
    """
    n_channels = input_shape[0]
    n_frames = input_shape[-1]  # time_frames

    # Follow SciPy ShortTimeFFT formula
    # See: https://github.com/scipy/scipy/blob/main/scipy/signal/_short_time_fft.py
    q_max = n_frames + self.SFT.p_min
    k_max = (q_max - 1) * self.SFT.hop + self.SFT.m_num - self.SFT.m_num_mid

    # Default parameters: k0=0, k1=None (which becomes k_max)
    # The output length is k1 - k0 = k_max - 0 = k_max
    k0 = 0
    k1 = k_max

    # If self.length is specified, it acts as an override to limit the output
    if self.length is not None:
        k1 = min(self.length, k1)

    output_samples = k1 - k0

    return (n_channels, output_samples)

Welch

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Welch method for power spectral density estimation.

Computes the one-sided amplitude spectrum using Welch's method for consistency with FFT and STFT methods. For a sine wave with amplitude A, the peak value at its frequency will be approximately A.

Notes

Internally uses scipy.signal.welch with scaling='spectrum' and converts the power spectrum to amplitude spectrum: - DC component (f=0): A = sqrt(P) - AC components (f>0): A = sqrt(2*P)

Source code in wandas/processing/spectral.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
class Welch(AudioOperation[NDArrayReal, NDArrayReal]):
    """Welch method for power spectral density estimation.

    Computes the one-sided amplitude spectrum using Welch's method for
    consistency with FFT and STFT methods. For a sine wave with amplitude A,
    the peak value at its frequency will be approximately A.

    Notes
    -----
    Internally uses scipy.signal.welch with scaling='spectrum' and converts
    the power spectrum to amplitude spectrum:
    - DC component (f=0): A = sqrt(P)
    - AC components (f>0): A = sqrt(2*P)
    """

    name = "welch"
    _display = "Welch"
    n_fft: int
    window: str
    hop_length: int | None
    win_length: int | None
    average: str
    detrend: str

    def __init__(
        self,
        sampling_rate: float,
        n_fft: int = 2048,
        hop_length: int | None = None,
        win_length: int | None = None,
        window: str = "hann",
        average: str = "mean",
        detrend: str = "constant",
    ):
        """
        Initialize Welch operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        n_fft : int, optional
            FFT size, default is 2048
        hop_length : int, optional
            Number of samples between frames. Default is win_length // 4
        win_length : int, optional
            Window length. Default is n_fft
        window : str, optional
            Window function type, default is 'hann'
        average : str, optional
            Averaging method, default is 'mean'
        detrend : str, optional
            Detrend method, default is 'constant'

        Raises
        ------
        ValueError
            If n_fft, win_length, or hop_length are invalid
        """
        # Validate and compute parameters
        actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "Welch method")

        self.n_fft = n_fft
        self.win_length = actual_win_length
        self.hop_length = actual_hop_length
        self.noverlap = self.win_length - self.hop_length if hop_length is not None else None
        self.window = window
        self.average = average
        self.detrend = detrend
        super().__init__(
            sampling_rate,
            n_fft=n_fft,
            win_length=self.win_length,
            hop_length=self.hop_length,
            window=window,
            average=average,
            detrend=detrend,
        )

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels, freqs)
        """
        n_freqs = self.n_fft // 2 + 1
        return (*input_shape[:-1], n_freqs)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for Welch operation.

        Converts power spectrum from scipy.signal.welch to one-sided
        amplitude spectrum for consistency with FFT/STFT.
        """
        from scipy import signal as ss

        if not isinstance(x, np.ndarray):
            raise ValueError("Welch operation requires a numpy ndarray, but received a non-ndarray.")

        _, result = ss.welch(
            x,
            nperseg=self.win_length,
            noverlap=self.noverlap,
            nfft=self.n_fft,
            window=self.window,
            average=self.average,
            detrend=self.detrend,
            scaling="spectrum",
        )

        # Convert power spectrum to amplitude spectrum for consistency with FFT/STFT.
        # scipy.signal.welch with scaling='spectrum' returns a one-sided power spectrum
        # where for a sine wave with amplitude A:
        #   - DC component (f=0): P = A^2 (no factor of 2 since DC is not mirrored)
        #   - AC components (f>0): P = A^2/2 (half power due to one-sided spectrum)
        # To recover amplitude A:
        #   - DC: A = sqrt(P)
        #   - AC: A = sqrt(2*P) = sqrt(2) * sqrt(P)
        result = np.sqrt(result)  # Convert to amplitude
        result[..., 1:-1] *= np.sqrt(2)  # Apply factor of sqrt(2) for AC components

        return np.array(result)
Attributes
name = 'welch' class-attribute instance-attribute
n_fft = n_fft instance-attribute
win_length = actual_win_length instance-attribute
hop_length = actual_hop_length instance-attribute
noverlap = self.win_length - self.hop_length if hop_length is not None else None instance-attribute
window = window instance-attribute
average = average instance-attribute
detrend = detrend instance-attribute
Functions
__init__(sampling_rate, n_fft=2048, hop_length=None, win_length=None, window='hann', average='mean', detrend='constant')

Initialize Welch operation

Parameters

sampling_rate : float Sampling rate (Hz) n_fft : int, optional FFT size, default is 2048 hop_length : int, optional Number of samples between frames. Default is win_length // 4 win_length : int, optional Window length. Default is n_fft window : str, optional Window function type, default is 'hann' average : str, optional Averaging method, default is 'mean' detrend : str, optional Detrend method, default is 'constant'

Raises

ValueError If n_fft, win_length, or hop_length are invalid

Source code in wandas/processing/spectral.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def __init__(
    self,
    sampling_rate: float,
    n_fft: int = 2048,
    hop_length: int | None = None,
    win_length: int | None = None,
    window: str = "hann",
    average: str = "mean",
    detrend: str = "constant",
):
    """
    Initialize Welch operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    n_fft : int, optional
        FFT size, default is 2048
    hop_length : int, optional
        Number of samples between frames. Default is win_length // 4
    win_length : int, optional
        Window length. Default is n_fft
    window : str, optional
        Window function type, default is 'hann'
    average : str, optional
        Averaging method, default is 'mean'
    detrend : str, optional
        Detrend method, default is 'constant'

    Raises
    ------
    ValueError
        If n_fft, win_length, or hop_length are invalid
    """
    # Validate and compute parameters
    actual_win_length, actual_hop_length = _validate_spectral_params(n_fft, win_length, hop_length, "Welch method")

    self.n_fft = n_fft
    self.win_length = actual_win_length
    self.hop_length = actual_hop_length
    self.noverlap = self.win_length - self.hop_length if hop_length is not None else None
    self.window = window
    self.average = average
    self.detrend = detrend
    super().__init__(
        sampling_rate,
        n_fft=n_fft,
        win_length=self.win_length,
        hop_length=self.hop_length,
        window=window,
        average=average,
        detrend=detrend,
    )
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels, freqs)

Source code in wandas/processing/spectral.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels, freqs)
    """
    n_freqs = self.n_fft // 2 + 1
    return (*input_shape[:-1], n_freqs)

NOctSpectrum

Bases: _NOctBase

N-octave spectrum operation

Source code in wandas/processing/spectral.py
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
class NOctSpectrum(_NOctBase):
    """N-octave spectrum operation"""

    name = "noct_spectrum"
    _display = "Oct"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for octave spectrum"""
        logger.debug(f"Applying NoctSpectrum to array with shape: {x.shape}")
        spec, _ = noct_spectrum(
            sig=x.T,
            fs=self.sampling_rate,
            fmin=self.fmin,
            fmax=self.fmax,
            n=self.n,
            G=self.G,
            fr=self.fr,
        )
        spec = np.expand_dims(spec, axis=0) if spec.ndim == 1 else spec.T
        logger.debug(f"NoctSpectrum applied, returning result with shape: {spec.shape}")
        return np.array(spec)
Attributes
name = 'noct_spectrum' class-attribute instance-attribute

NOctSynthesis

Bases: _NOctBase

Octave synthesis operation

Source code in wandas/processing/spectral.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
class NOctSynthesis(_NOctBase):
    """Octave synthesis operation"""

    name = "noct_synthesis"
    _display = "Octs"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for octave synthesis"""
        logger.debug(f"Applying NoctSynthesis to array with shape: {x.shape}")
        # Calculate n from shape[-1]
        n = x.shape[-1]  # Calculate n from shape[-1]
        n = n * 2 - 1 if n % 2 == 0 else (n - 1) * 2
        freqs = np.fft.rfftfreq(n, d=1 / self.sampling_rate)
        result, _ = noct_synthesis(
            spectrum=np.abs(x).T,
            freqs=freqs,
            fmin=self.fmin,
            fmax=self.fmax,
            n=self.n,
            G=self.G,
            fr=self.fr,
        )
        result = result.T
        logger.debug(f"NoctSynthesis applied, returning result with shape: {result.shape}")
        return np.array(result)
Attributes
name = 'noct_synthesis' class-attribute instance-attribute

Coherence

Bases: _CrossSpectralBase

Coherence estimation operation

Source code in wandas/processing/spectral.py
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
class Coherence(_CrossSpectralBase):
    """Coherence estimation operation"""

    name = "coherence"
    _method_label = "Coherence"
    _display = "Coh"

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Processor function for coherence estimation operation"""
        logger.debug(f"Applying coherence estimation to array with shape: {x.shape}")
        from scipy import signal as ss

        _, coh = ss.coherence(
            x=x[:, np.newaxis],
            y=x[np.newaxis, :],
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.noverlap,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
        )

        # Reshape result to (n_channels * n_channels, n_freqs)
        result: NDArrayReal = coh.transpose(1, 0, 2).reshape(-1, coh.shape[-1])

        logger.debug(f"Coherence estimation applied, result shape: {result.shape}")
        return result
Attributes
name = 'coherence' class-attribute instance-attribute

CSD

Bases: _ScaledCrossSpectralBase

Cross-spectral density estimation operation

Source code in wandas/processing/spectral.py
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
class CSD(_ScaledCrossSpectralBase):
    """Cross-spectral density estimation operation"""

    name = "csd"
    _method_label = "CSD"
    _display = "CSD"

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Processor function for cross-spectral density estimation operation"""
        logger.debug(f"Applying CSD estimation to array with shape: {x.shape}")
        from scipy import signal as ss

        # Calculate all combinations using scipy's csd function
        _, csd_result = ss.csd(
            x=x[:, np.newaxis],
            y=x[np.newaxis, :],
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.noverlap,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
            scaling=self.scaling,
            average=self.average,
        )

        # Reshape result to (n_channels * n_channels, n_freqs)
        result: NDArrayComplex = csd_result.transpose(1, 0, 2).reshape(-1, csd_result.shape[-1])

        logger.debug(f"CSD estimation applied, result shape: {result.shape}")
        return result
Attributes
name = 'csd' class-attribute instance-attribute

TransferFunction

Bases: _ScaledCrossSpectralBase

Transfer function estimation operation

Source code in wandas/processing/spectral.py
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
class TransferFunction(_ScaledCrossSpectralBase):
    """Transfer function estimation operation"""

    name = "transfer_function"
    _method_label = "Transfer function"
    _display = "H"

    def _process_array(self, x: NDArrayReal) -> NDArrayComplex:
        """Processor function for transfer function estimation operation"""
        logger.debug(f"Applying transfer function estimation to array with shape: {x.shape}")
        from scipy import signal as ss

        # Calculate cross-spectral density between all channels
        _f, p_yx = ss.csd(
            x=x[:, np.newaxis, :],
            y=x[np.newaxis, :, :],
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.noverlap,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
            scaling=self.scaling,
            average=self.average,
            axis=-1,
        )
        # p_yx shape: (num_channels, num_channels, num_frequencies)

        # Calculate power spectral density for each channel
        _f, p_xx = ss.welch(
            x=x,
            fs=self.sampling_rate,
            nperseg=self.win_length,
            noverlap=self.noverlap,
            nfft=self.n_fft,
            window=self.window,
            detrend=self.detrend,
            scaling=self.scaling,
            average=self.average,
            axis=-1,
        )
        # p_xx shape: (num_channels, num_frequencies)

        # Calculate transfer function H(f) = P_yx / P_xx
        h_f = p_yx / p_xx[np.newaxis, :, :]
        result: NDArrayComplex = h_f.transpose(1, 0, 2).reshape(-1, h_f.shape[-1])

        logger.debug(f"Transfer function estimation applied, result shape: {result.shape}")
        return result
Attributes
name = 'transfer_function' class-attribute instance-attribute

Functions

Statistical Processing / 統計処理

Provides statistical analysis functions for audio data. オーディオデータの統計分析機能を提供します。

wandas.processing.stats

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

ABS

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Absolute value operation

Source code in wandas/processing/stats.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ABS(AudioOperation[NDArrayReal, NDArrayReal]):
    """Absolute value operation"""

    name = "abs"
    _display = "abs"

    def __init__(self, sampling_rate: float):
        """
        Initialize absolute value operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        """
        super().__init__(sampling_rate)

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        return da.abs(data)
Attributes
name = 'abs' class-attribute instance-attribute
Functions
__init__(sampling_rate)

Initialize absolute value operation

Parameters

sampling_rate : float Sampling rate (Hz)

Source code in wandas/processing/stats.py
18
19
20
21
22
23
24
25
26
27
def __init__(self, sampling_rate: float):
    """
    Initialize absolute value operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    """
    super().__init__(sampling_rate)
process(data)
Source code in wandas/processing/stats.py
29
30
31
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    return da.abs(data)

Power

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Power operation

Source code in wandas/processing/stats.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Power(AudioOperation[NDArrayReal, NDArrayReal]):
    """Power operation"""

    name = "power"
    _display = "pow"

    def __init__(self, sampling_rate: float, exponent: float):
        """
        Initialize power operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        exponent : float
            Power exponent
        """
        super().__init__(sampling_rate)
        self.exp = exponent

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        return da.power(data, self.exp)
Attributes
name = 'power' class-attribute instance-attribute
exp = exponent instance-attribute
Functions
__init__(sampling_rate, exponent)

Initialize power operation

Parameters

sampling_rate : float Sampling rate (Hz) exponent : float Power exponent

Source code in wandas/processing/stats.py
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, sampling_rate: float, exponent: float):
    """
    Initialize power operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    exponent : float
        Power exponent
    """
    super().__init__(sampling_rate)
    self.exp = exponent
process(data)
Source code in wandas/processing/stats.py
54
55
56
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    return da.power(data, self.exp)

Sum

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Sum calculation

Source code in wandas/processing/stats.py
59
60
61
62
63
64
65
66
67
class Sum(AudioOperation[NDArrayReal, NDArrayReal]):
    """Sum calculation"""

    name = "sum"
    _display = "sum"

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        return data.sum(axis=0, keepdims=True)
Attributes
name = 'sum' class-attribute instance-attribute
Functions
process(data)
Source code in wandas/processing/stats.py
65
66
67
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    return data.sum(axis=0, keepdims=True)

Mean

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Mean calculation

Source code in wandas/processing/stats.py
70
71
72
73
74
75
76
77
78
class Mean(AudioOperation[NDArrayReal, NDArrayReal]):
    """Mean calculation"""

    name = "mean"
    _display = "mean"

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        return data.mean(axis=0, keepdims=True)
Attributes
name = 'mean' class-attribute instance-attribute
Functions
process(data)
Source code in wandas/processing/stats.py
76
77
78
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    return data.mean(axis=0, keepdims=True)

ChannelDifference

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Channel difference calculation operation

Source code in wandas/processing/stats.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ChannelDifference(AudioOperation[NDArrayReal, NDArrayReal]):
    """Channel difference calculation operation"""

    name = "channel_difference"
    _display = "diff"
    other_channel: int

    def __init__(self, sampling_rate: float, other_channel: int = 0):
        """
        Initialize channel difference calculation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        other_channel : int
            Channel to calculate difference with, default is 0
        """
        self.other_channel = other_channel
        super().__init__(sampling_rate, other_channel=other_channel)

    def process(self, data: DaArray) -> DaArray:
        # Use Dask's aggregate function directly without map_blocks
        result = data - data[self.other_channel]
        return result
Attributes
name = 'channel_difference' class-attribute instance-attribute
other_channel = other_channel instance-attribute
Functions
__init__(sampling_rate, other_channel=0)

Initialize channel difference calculation

Parameters

sampling_rate : float Sampling rate (Hz) other_channel : int Channel to calculate difference with, default is 0

Source code in wandas/processing/stats.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(self, sampling_rate: float, other_channel: int = 0):
    """
    Initialize channel difference calculation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    other_channel : int
        Channel to calculate difference with, default is 0
    """
    self.other_channel = other_channel
    super().__init__(sampling_rate, other_channel=other_channel)
process(data)
Source code in wandas/processing/stats.py
102
103
104
105
def process(self, data: DaArray) -> DaArray:
    # Use Dask's aggregate function directly without map_blocks
    result = data - data[self.other_channel]
    return result

Functions

Temporal Processing / 時間領域処理

Provides time-domain processing capabilities. 時間領域の処理機能を提供します。

wandas.processing.temporal

Attributes

logger = logging.getLogger(__name__) module-attribute

MIN_SOUND_LEVEL_POWER_RATIO = 1e-20 module-attribute

Classes

ReSampling

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Resampling operation

Source code in wandas/processing/temporal.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ReSampling(AudioOperation[NDArrayReal, NDArrayReal]):
    """Resampling operation"""

    name = "resampling"
    _display = "rs"

    def __init__(self, sampling_rate: float, target_sr: float):
        """
        Initialize resampling operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        target_sampling_rate : float
            Target sampling rate (Hz)

        Raises
        ------
        ValueError
            If sampling_rate or target_sr is not positive
        """
        validate_sampling_rate(sampling_rate, "source sampling rate")
        validate_sampling_rate(target_sr, "target sampling rate")
        super().__init__(sampling_rate, target_sr=target_sr)
        self.target_sr = target_sr

    def get_metadata_updates(self) -> dict[str, Any]:
        """
        Update sampling rate to target sampling rate.

        Returns
        -------
        dict
            Metadata updates with new sampling rate

        Notes
        -----
        Resampling always produces output at target_sr, regardless of input
        sampling rate. All necessary parameters are provided at initialization.
        """
        return {"sampling_rate": self.target_sr}

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        # Calculate length after resampling
        ratio = float(self.target_sr) / float(self.sampling_rate)
        n_samples = int(np.ceil(input_shape[-1] * ratio))
        return (*input_shape[:-1], n_samples)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for resampling operation"""
        logger.debug(f"Applying resampling to array with shape: {x.shape}")
        result: NDArrayReal = librosa.resample(x, orig_sr=self.sampling_rate, target_sr=self.target_sr)
        logger.debug(f"Resampling applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'resampling' class-attribute instance-attribute
target_sr = target_sr instance-attribute
Functions
__init__(sampling_rate, target_sr)

Initialize resampling operation

Parameters

sampling_rate : float Sampling rate (Hz) target_sampling_rate : float Target sampling rate (Hz)

Raises

ValueError If sampling_rate or target_sr is not positive

Source code in wandas/processing/temporal.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, sampling_rate: float, target_sr: float):
    """
    Initialize resampling operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    target_sampling_rate : float
        Target sampling rate (Hz)

    Raises
    ------
    ValueError
        If sampling_rate or target_sr is not positive
    """
    validate_sampling_rate(sampling_rate, "source sampling rate")
    validate_sampling_rate(target_sr, "target sampling rate")
    super().__init__(sampling_rate, target_sr=target_sr)
    self.target_sr = target_sr
get_metadata_updates()

Update sampling rate to target sampling rate.

Returns

dict Metadata updates with new sampling rate

Notes

Resampling always produces output at target_sr, regardless of input sampling rate. All necessary parameters are provided at initialization.

Source code in wandas/processing/temporal.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def get_metadata_updates(self) -> dict[str, Any]:
    """
    Update sampling rate to target sampling rate.

    Returns
    -------
    dict
        Metadata updates with new sampling rate

    Notes
    -----
    Resampling always produces output at target_sr, regardless of input
    sampling rate. All necessary parameters are provided at initialization.
    """
    return {"sampling_rate": self.target_sr}
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/temporal.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    # Calculate length after resampling
    ratio = float(self.target_sr) / float(self.sampling_rate)
    n_samples = int(np.ceil(input_shape[-1] * ratio))
    return (*input_shape[:-1], n_samples)

Trim

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Trimming operation

Source code in wandas/processing/temporal.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class Trim(AudioOperation[NDArrayReal, NDArrayReal]):
    """Trimming operation"""

    name = "trim"
    _display = "trim"

    def __init__(
        self,
        sampling_rate: float,
        start: float,
        end: float,
    ):
        """
        Initialize trimming operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        start : float
            Start time for trimming (seconds)
        end : float
            End time for trimming (seconds)
        """
        super().__init__(sampling_rate, start=start, end=end)
        self.start = start
        self.end = end
        self.start_sample = int(start * sampling_rate)
        self.end_sample = int(end * sampling_rate)
        logger.debug(f"Initialized Trim operation with start: {self.start}, end: {self.end}")

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        # Calculate length after trimming
        # Exclude parts where there is no signal
        end_sample = min(self.end_sample, input_shape[-1])
        n_samples = end_sample - self.start_sample
        return (*input_shape[:-1], n_samples)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for trimming operation"""
        logger.debug(f"Applying trim to array with shape: {x.shape}")
        # Apply trimming
        result = x[..., self.start_sample : self.end_sample]
        logger.debug(f"Trim applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'trim' class-attribute instance-attribute
start = start instance-attribute
end = end instance-attribute
start_sample = int(start * sampling_rate) instance-attribute
end_sample = int(end * sampling_rate) instance-attribute
Functions
__init__(sampling_rate, start, end)

Initialize trimming operation

Parameters

sampling_rate : float Sampling rate (Hz) start : float Start time for trimming (seconds) end : float End time for trimming (seconds)

Source code in wandas/processing/temporal.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __init__(
    self,
    sampling_rate: float,
    start: float,
    end: float,
):
    """
    Initialize trimming operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    start : float
        Start time for trimming (seconds)
    end : float
        End time for trimming (seconds)
    """
    super().__init__(sampling_rate, start=start, end=end)
    self.start = start
    self.end = end
    self.start_sample = int(start * sampling_rate)
    self.end_sample = int(end * sampling_rate)
    logger.debug(f"Initialized Trim operation with start: {self.start}, end: {self.end}")
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/temporal.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    # Calculate length after trimming
    # Exclude parts where there is no signal
    end_sample = min(self.end_sample, input_shape[-1])
    n_samples = end_sample - self.start_sample
    return (*input_shape[:-1], n_samples)

FixLength

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Operation to adjust signal length to a specified length.

Source code in wandas/processing/temporal.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class FixLength(AudioOperation[NDArrayReal, NDArrayReal]):
    """Operation to adjust signal length to a specified length."""

    name = "fix_length"
    _display = "fix"

    def __init__(
        self,
        sampling_rate: float,
        length: int | None = None,
        duration: float | None = None,
    ):
        """
        Initialize fix length operation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        length : Optional[int]
            Target length for fixing
        duration : Optional[float]
            Target length for fixing
        """
        if length is None:
            if duration is None:
                raise ValueError("Either length or duration must be provided.")
            length = int(duration * sampling_rate)
        self.target_length = length

        super().__init__(sampling_rate, target_length=self.target_length)

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape

        Returns
        -------
        tuple
            Output data shape
        """
        return (*input_shape[:-1], self.target_length)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for padding operation"""
        logger.debug(f"Applying padding to array with shape: {x.shape}")
        # Apply padding
        pad_width = self.target_length - x.shape[-1]
        if pad_width > 0:
            result = np.pad(x, ((0, 0), (0, pad_width)), mode="constant")
        else:
            result = x[..., : self.target_length]
        logger.debug(f"Padding applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'fix_length' class-attribute instance-attribute
target_length = length instance-attribute
Functions
__init__(sampling_rate, length=None, duration=None)

Initialize fix length operation

Parameters

sampling_rate : float Sampling rate (Hz) length : Optional[int] Target length for fixing duration : Optional[float] Target length for fixing

Source code in wandas/processing/temporal.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def __init__(
    self,
    sampling_rate: float,
    length: int | None = None,
    duration: float | None = None,
):
    """
    Initialize fix length operation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    length : Optional[int]
        Target length for fixing
    duration : Optional[float]
        Target length for fixing
    """
    if length is None:
        if duration is None:
            raise ValueError("Either length or duration must be provided.")
        length = int(duration * sampling_rate)
    self.target_length = length

    super().__init__(sampling_rate, target_length=self.target_length)
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape

Returns

tuple Output data shape

Source code in wandas/processing/temporal.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape

    Returns
    -------
    tuple
        Output data shape
    """
    return (*input_shape[:-1], self.target_length)

RmsTrend

Bases: AudioOperation[NDArrayReal, NDArrayReal]

RMS calculation

Source code in wandas/processing/temporal.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
class RmsTrend(AudioOperation[NDArrayReal, NDArrayReal]):
    """RMS calculation"""

    name = "rms_trend"
    _display = "RMS"
    frame_length: int
    hop_length: int
    Aw: bool

    def __init__(
        self,
        sampling_rate: float,
        frame_length: int = 2048,
        hop_length: int = 512,
        ref: list[float] | float = 1.0,
        dB: bool = False,
        Aw: bool = False,
    ) -> None:
        """
        Initialize RMS calculation

        Parameters
        ----------
        sampling_rate : float
            Sampling rate (Hz)
        frame_length : int
            Frame length, default is 2048
        hop_length : int
            Hop length, default is 512
        ref : Union[list[float], float]
            Reference value(s) for dB calculation
        dB : bool
            Whether to convert to decibels
        Aw : bool
            Whether to apply A-weighting before RMS calculation
        """
        self.frame_length = frame_length
        self.hop_length = hop_length
        self.dB = dB
        self.Aw = Aw
        self.ref = np.array(ref if isinstance(ref, list) else [ref])
        super().__init__(
            sampling_rate,
            frame_length=frame_length,
            hop_length=hop_length,
            dB=dB,
            Aw=Aw,
            ref=self.ref,
        )

    def get_metadata_updates(self) -> dict[str, Any]:
        """
        Update sampling rate based on hop length.

        Returns
        -------
        dict
            Metadata updates with new sampling rate based on hop length

        Notes
        -----
        The output sampling rate is determined by downsampling the input
        by hop_length. All necessary parameters are provided at initialization.
        """
        new_sr = self.sampling_rate / self.hop_length
        return {"sampling_rate": new_sr}

    def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
        """
        Calculate output data shape after operation

        Parameters
        ----------
        input_shape : tuple
            Input data shape (channels, samples)

        Returns
        -------
        tuple
            Output data shape (channels, frames)
        """
        n_frames = librosa.feature.rms(
            y=np.ones((1, input_shape[-1])),
            frame_length=self.frame_length,
            hop_length=self.hop_length,
        ).shape[-1]
        return (*input_shape[:-1], n_frames)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for RMS calculation"""
        logger.debug(f"Applying RMS to array with shape: {x.shape}")

        if self.Aw:
            # Apply A-weighting
            _x = A_weight(x, self.sampling_rate)
            if isinstance(_x, np.ndarray):
                # Use the first element if A_weight returns a tuple
                x = _x
            elif isinstance(_x, tuple):
                # Use the first element if A_weight returns a tuple
                x = _x[0]
            else:
                raise ValueError("A_weighting returned an unexpected type.")

        # Calculate RMS
        result: NDArrayReal = librosa.feature.rms(y=x, frame_length=self.frame_length, hop_length=self.hop_length)[
            ..., 0, :
        ]

        if self.dB:
            # Convert to dB
            result = 20 * np.log10(np.maximum(result / self.ref[..., np.newaxis], DB_FLOOR))
        logger.debug(f"RMS applied, returning result with shape: {result.shape}")
        return result
Attributes
name = 'rms_trend' class-attribute instance-attribute
frame_length = frame_length instance-attribute
hop_length = hop_length instance-attribute
dB = dB instance-attribute
Aw = Aw instance-attribute
ref = np.array(ref if isinstance(ref, list) else [ref]) instance-attribute
Functions
__init__(sampling_rate, frame_length=2048, hop_length=512, ref=1.0, dB=False, Aw=False)

Initialize RMS calculation

Parameters

sampling_rate : float Sampling rate (Hz) frame_length : int Frame length, default is 2048 hop_length : int Hop length, default is 512 ref : Union[list[float], float] Reference value(s) for dB calculation dB : bool Whether to convert to decibels Aw : bool Whether to apply A-weighting before RMS calculation

Source code in wandas/processing/temporal.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def __init__(
    self,
    sampling_rate: float,
    frame_length: int = 2048,
    hop_length: int = 512,
    ref: list[float] | float = 1.0,
    dB: bool = False,
    Aw: bool = False,
) -> None:
    """
    Initialize RMS calculation

    Parameters
    ----------
    sampling_rate : float
        Sampling rate (Hz)
    frame_length : int
        Frame length, default is 2048
    hop_length : int
        Hop length, default is 512
    ref : Union[list[float], float]
        Reference value(s) for dB calculation
    dB : bool
        Whether to convert to decibels
    Aw : bool
        Whether to apply A-weighting before RMS calculation
    """
    self.frame_length = frame_length
    self.hop_length = hop_length
    self.dB = dB
    self.Aw = Aw
    self.ref = np.array(ref if isinstance(ref, list) else [ref])
    super().__init__(
        sampling_rate,
        frame_length=frame_length,
        hop_length=hop_length,
        dB=dB,
        Aw=Aw,
        ref=self.ref,
    )
get_metadata_updates()

Update sampling rate based on hop length.

Returns

dict Metadata updates with new sampling rate based on hop length

Notes

The output sampling rate is determined by downsampling the input by hop_length. All necessary parameters are provided at initialization.

Source code in wandas/processing/temporal.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def get_metadata_updates(self) -> dict[str, Any]:
    """
    Update sampling rate based on hop length.

    Returns
    -------
    dict
        Metadata updates with new sampling rate based on hop length

    Notes
    -----
    The output sampling rate is determined by downsampling the input
    by hop_length. All necessary parameters are provided at initialization.
    """
    new_sr = self.sampling_rate / self.hop_length
    return {"sampling_rate": new_sr}
calculate_output_shape(input_shape)

Calculate output data shape after operation

Parameters

input_shape : tuple Input data shape (channels, samples)

Returns

tuple Output data shape (channels, frames)

Source code in wandas/processing/temporal.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def calculate_output_shape(self, input_shape: tuple[int, ...]) -> tuple[int, ...]:
    """
    Calculate output data shape after operation

    Parameters
    ----------
    input_shape : tuple
        Input data shape (channels, samples)

    Returns
    -------
    tuple
        Output data shape (channels, frames)
    """
    n_frames = librosa.feature.rms(
        y=np.ones((1, input_shape[-1])),
        frame_length=self.frame_length,
        hop_length=self.hop_length,
    ).shape[-1]
    return (*input_shape[:-1], n_frames)

SoundLevel

Bases: AudioOperation[NDArrayReal, NDArrayReal]

Time-weighted RMS or sound level with frequency and time weighting.

Source code in wandas/processing/temporal.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
class SoundLevel(AudioOperation[NDArrayReal, NDArrayReal]):
    """Time-weighted RMS or sound level with frequency and time weighting."""

    name = "sound_level"

    def __init__(
        self,
        sampling_rate: float,
        ref: list[float] | float = 1.0,
        freq_weighting: str | None = "Z",
        time_weighting: str = "Fast",
        dB: bool = False,
    ) -> None:
        validate_sampling_rate(sampling_rate)
        self.ref = np.atleast_1d(np.asarray(ref, dtype=float))
        if np.any(self.ref <= 0):
            raise ValueError(
                "Invalid sound level reference\n"
                f"  Got: {self.ref.tolist()}\n"
                "  Expected: Positive reference values\n"
                "Sound pressure level requires a positive reference pressure."
            )
        self.freq_weighting = self._normalize_freq_weighting(freq_weighting)
        self.time_weighting = self._normalize_time_weighting(time_weighting)
        self.dB = dB
        super().__init__(
            sampling_rate,
            ref=self.ref,
            freq_weighting=self.freq_weighting,
            time_weighting=self.time_weighting,
            dB=dB,
        )

    @staticmethod
    def _normalize_freq_weighting(freq_weighting: str | None) -> str:
        normalized = "Z" if freq_weighting is None else str(freq_weighting).upper()
        if normalized not in {"A", "C", "Z"}:
            raise ValueError(
                "Invalid frequency weighting\n"
                f"  Got: {freq_weighting!r}\n"
                "  Expected: 'A', 'C', or 'Z'\n"
                "Choose a supported IEC-style weighting curve before calculating sound level."
            )
        return normalized

    @staticmethod
    def _normalize_time_weighting(time_weighting: str) -> str:
        normalized = str(time_weighting).strip().upper()
        if normalized in {"F", "FAST"}:
            return "Fast"
        if normalized in {"S", "SLOW"}:
            return "Slow"
        raise ValueError(
            "Invalid time weighting\n"
            f"  Got: {time_weighting!r}\n"
            "  Expected: 'Fast' or 'Slow'\n"
            "Choose a supported sound level meter time constant before calculating sound level."
        )

    @property
    def time_constant(self) -> float:
        """Return the RC time constant in seconds."""
        return 0.125 if self.time_weighting == "Fast" else 1.0

    @staticmethod
    def _output_dtype(
        input_dtype: np.dtype[Any],
    ) -> np.dtype[np.float32] | np.dtype[np.float64]:
        """Return the floating output dtype for the given input dtype."""
        if np.dtype(input_dtype) == np.dtype(np.float32):
            return np.dtype(np.float32)
        return np.dtype(np.float64)

    def get_display_name(self) -> str:
        """Get display name for the operation for use in channel labels."""
        if self.dB:
            return f"L{self.freq_weighting}{self.time_weighting[0]}"
        return f"{self.freq_weighting}{self.time_weighting[0]}RMS"

    def _reference_squared(self, n_channels: int) -> NDArrayReal:
        """Return squared reference pressure for each channel."""
        if self.ref.size == 1:
            ref = np.repeat(self.ref, n_channels)
        elif self.ref.size == n_channels:
            ref = self.ref
        else:
            raise ValueError(
                "Reference count mismatch\n"
                f"  Got: {self.ref.size} reference values for {n_channels} channels\n"
                "  Expected: One shared reference or one reference per channel\n"
                "Provide ref as a scalar or a list matching the number of channels."
            )
        return np.asarray(np.square(ref), dtype=np.float64)

    def _process_array(self, x: NDArrayReal) -> NDArrayReal:
        """Create processor function for sound level calculation."""
        logger.debug(
            "Applying sound level to array with shape %s using %s/%s weighting",
            x.shape,
            self.freq_weighting,
            self.time_weighting,
        )
        output_dtype = self._output_dtype(x.dtype)
        weighted_input = x if x.dtype == np.float64 else np.asarray(x, dtype=np.float64)
        if self.freq_weighting == "Z":
            weighted = weighted_input
        else:
            weighted = frequency_weight(weighted_input, self.sampling_rate, curve=self.freq_weighting)
        squared = np.square(weighted)
        alpha = np.asarray(np.exp(-1.0 / (self.sampling_rate * self.time_constant)), dtype=np.float64).item()
        smoothed = lfilter([1.0 - alpha], [1.0, -alpha], squared, axis=-1)
        if self.dB:
            ref_squared_broadcast = self._reference_squared(smoothed.shape[0])[:, np.newaxis]
            result = 10.0 * np.log10(np.maximum(smoothed / ref_squared_broadcast, MIN_SOUND_LEVEL_POWER_RATIO))
        else:
            result = np.sqrt(smoothed)
        logger.debug(f"Sound level applied, returning result with shape: {result.shape}")
        return np.asarray(result, dtype=output_dtype)

    def process(self, data: DaArray) -> DaArray:
        """Execute sound level with floating output dtype metadata."""
        logger.debug("Adding delayed sound level operation to computation graph")
        wrapper = self._create_named_wrapper()
        delayed_result = delayed(wrapper, pure=self.pure)(data)
        output_shape = self.calculate_output_shape(data.shape)
        return da.from_delayed(delayed_result, shape=output_shape, dtype=self._output_dtype(data.dtype))
Attributes
name = 'sound_level' class-attribute instance-attribute
ref = np.atleast_1d(np.asarray(ref, dtype=float)) instance-attribute
freq_weighting = self._normalize_freq_weighting(freq_weighting) instance-attribute
time_weighting = self._normalize_time_weighting(time_weighting) instance-attribute
dB = dB instance-attribute
time_constant property

Return the RC time constant in seconds.

Functions
__init__(sampling_rate, ref=1.0, freq_weighting='Z', time_weighting='Fast', dB=False)
Source code in wandas/processing/temporal.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def __init__(
    self,
    sampling_rate: float,
    ref: list[float] | float = 1.0,
    freq_weighting: str | None = "Z",
    time_weighting: str = "Fast",
    dB: bool = False,
) -> None:
    validate_sampling_rate(sampling_rate)
    self.ref = np.atleast_1d(np.asarray(ref, dtype=float))
    if np.any(self.ref <= 0):
        raise ValueError(
            "Invalid sound level reference\n"
            f"  Got: {self.ref.tolist()}\n"
            "  Expected: Positive reference values\n"
            "Sound pressure level requires a positive reference pressure."
        )
    self.freq_weighting = self._normalize_freq_weighting(freq_weighting)
    self.time_weighting = self._normalize_time_weighting(time_weighting)
    self.dB = dB
    super().__init__(
        sampling_rate,
        ref=self.ref,
        freq_weighting=self.freq_weighting,
        time_weighting=self.time_weighting,
        dB=dB,
    )
get_display_name()

Get display name for the operation for use in channel labels.

Source code in wandas/processing/temporal.py
401
402
403
404
405
def get_display_name(self) -> str:
    """Get display name for the operation for use in channel labels."""
    if self.dB:
        return f"L{self.freq_weighting}{self.time_weighting[0]}"
    return f"{self.freq_weighting}{self.time_weighting[0]}RMS"
process(data)

Execute sound level with floating output dtype metadata.

Source code in wandas/processing/temporal.py
447
448
449
450
451
452
453
def process(self, data: DaArray) -> DaArray:
    """Execute sound level with floating output dtype metadata."""
    logger.debug("Adding delayed sound level operation to computation graph")
    wrapper = self._create_named_wrapper()
    delayed_result = delayed(wrapper, pure=self.pure)(data)
    output_shape = self.calculate_output_shape(data.shape)
    return da.from_delayed(delayed_result, shape=output_shape, dtype=self._output_dtype(data.dtype))

Functions