Skip to content

IO Module / 入出力モジュール

The wandas.io module provides reading and writing capabilities for various file formats. wandas.io モジュールは、様々なファイル形式の読み書き機能を提供します。

File Readers / ファイルリーダー

Provides functionality to read data from various file formats. 様々なファイル形式からデータを読み込む機能を提供します。

wandas.io.readers

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

CSVFileInfoParams

Bases: TypedDict

Type definition for CSV file reader parameters in get_file_info.

Parameters

delimiter : str Delimiter character. Default is ",". header : Optional[int] Row number to use as header. Default is 0 (first row). Set to None if no header. time_column : Union[int, str] Index or name of the time column. Default is 0.

Source code in wandas/io/readers.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CSVFileInfoParams(TypedDict, total=False):
    """Type definition for CSV file reader parameters in get_file_info.

    Parameters
    ----------
    delimiter : str
        Delimiter character. Default is ",".
    header : Optional[int]
        Row number to use as header. Default is 0 (first row).
        Set to None if no header.
    time_column : Union[int, str]
        Index or name of the time column. Default is 0.
    """

    delimiter: str
    header: int | None
    time_column: int | str
Attributes
delimiter instance-attribute
header instance-attribute
time_column instance-attribute

CSVGetDataParams

Bases: TypedDict

Type definition for CSV file reader parameters in get_data.

Parameters

delimiter : str Delimiter character. Default is ",". header : Optional[int] Row number to use as header. Default is 0. time_column : Union[int, str] Index or name of the time column. Default is 0.

Source code in wandas/io/readers.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class CSVGetDataParams(TypedDict, total=False):
    """Type definition for CSV file reader parameters in get_data.

    Parameters
    ----------
    delimiter : str
        Delimiter character. Default is ",".
    header : Optional[int]
        Row number to use as header. Default is 0.
    time_column : Union[int, str]
        Index or name of the time column. Default is 0.
    """

    delimiter: str
    header: int | None
    time_column: int | str
Attributes
delimiter instance-attribute
header instance-attribute
time_column instance-attribute

FileReader

Bases: ABC

Base class for audio file readers.

Source code in wandas/io/readers.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class FileReader(ABC):
    """Base class for audio file readers."""

    # Class attribute for supported file extensions
    supported_extensions: list[str] = []

    @classmethod
    @abstractmethod
    def get_file_info(
        cls,
        path: str | Path | bytes | bytearray | memoryview | BinaryIO,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Get basic information about the audio file.

        Args:
            path: Path to the file.
            **kwargs: Additional parameters specific to the file reader.

        Returns:
            Dictionary containing file information including:
            - samplerate: Sampling rate in Hz
            - channels: Number of channels
            - frames: Total number of frames
            - format: File format
            - duration: Duration in seconds
        """
        pass  # pragma: no cover

    @classmethod
    @abstractmethod
    def get_data(
        cls,
        path: str | Path | bytes | bytearray | memoryview | BinaryIO,
        channels: list[int],
        start_idx: int,
        frames: int,
        **kwargs: Any,
    ) -> ArrayLike:
        """Read audio data from the file.

        Args:
            path: Path to the file.
            channels: List of channel indices to read.
            start_idx: Starting frame index.
            frames: Number of frames to read.
            **kwargs: Additional parameters specific to the file reader.

        Returns:
            Array of shape (channels, frames) containing the audio data.
        """
        pass  # pragma: no cover

    @classmethod
    def can_read(cls, path: str | Path) -> bool:
        """Check if this reader can handle the file based on extension."""
        ext = Path(path).suffix.lower()
        return ext in cls.supported_extensions
Attributes
supported_extensions = [] class-attribute instance-attribute
Functions
get_file_info(path, **kwargs) abstractmethod classmethod

Get basic information about the audio file.

Parameters:

Name Type Description Default
path str | Path | bytes | bytearray | memoryview | BinaryIO

Path to the file.

required
**kwargs Any

Additional parameters specific to the file reader.

{}

Returns:

Type Description
dict[str, Any]

Dictionary containing file information including:

dict[str, Any]
  • samplerate: Sampling rate in Hz
dict[str, Any]
  • channels: Number of channels
dict[str, Any]
  • frames: Total number of frames
dict[str, Any]
  • format: File format
dict[str, Any]
  • duration: Duration in seconds
Source code in wandas/io/readers.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@classmethod
@abstractmethod
def get_file_info(
    cls,
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    **kwargs: Any,
) -> dict[str, Any]:
    """Get basic information about the audio file.

    Args:
        path: Path to the file.
        **kwargs: Additional parameters specific to the file reader.

    Returns:
        Dictionary containing file information including:
        - samplerate: Sampling rate in Hz
        - channels: Number of channels
        - frames: Total number of frames
        - format: File format
        - duration: Duration in seconds
    """
    pass  # pragma: no cover
get_data(path, channels, start_idx, frames, **kwargs) abstractmethod classmethod

Read audio data from the file.

Parameters:

Name Type Description Default
path str | Path | bytes | bytearray | memoryview | BinaryIO

Path to the file.

required
channels list[int]

List of channel indices to read.

required
start_idx int

Starting frame index.

required
frames int

Number of frames to read.

required
**kwargs Any

Additional parameters specific to the file reader.

{}

Returns:

Type Description
ArrayLike

Array of shape (channels, frames) containing the audio data.

Source code in wandas/io/readers.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
@abstractmethod
def get_data(
    cls,
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    channels: list[int],
    start_idx: int,
    frames: int,
    **kwargs: Any,
) -> ArrayLike:
    """Read audio data from the file.

    Args:
        path: Path to the file.
        channels: List of channel indices to read.
        start_idx: Starting frame index.
        frames: Number of frames to read.
        **kwargs: Additional parameters specific to the file reader.

    Returns:
        Array of shape (channels, frames) containing the audio data.
    """
    pass  # pragma: no cover
can_read(path) classmethod

Check if this reader can handle the file based on extension.

Source code in wandas/io/readers.py
106
107
108
109
110
@classmethod
def can_read(cls, path: str | Path) -> bool:
    """Check if this reader can handle the file based on extension."""
    ext = Path(path).suffix.lower()
    return ext in cls.supported_extensions

SoundFileReader

Bases: FileReader

Audio file reader using SoundFile library.

Source code in wandas/io/readers.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SoundFileReader(FileReader):
    """Audio file reader using SoundFile library."""

    # SoundFile supported formats
    supported_extensions = [".wav", ".flac", ".ogg", ".aiff", ".aif", ".snd"]

    @classmethod
    def get_file_info(
        cls,
        path: str | Path | bytes | bytearray | memoryview | BinaryIO,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Get basic information about the audio file."""
        info = sf.info(_prepare_file_source(path))
        return {
            "samplerate": info.samplerate,
            "channels": info.channels,
            "frames": info.frames,
            "format": info.format,
            "subtype": info.subtype,
            "duration": info.frames / info.samplerate,
        }

    @classmethod
    def get_data(
        cls,
        path: str | Path | bytes | bytearray | memoryview | BinaryIO,
        channels: list[int],
        start_idx: int,
        frames: int,
        normalize: bool = False,
        **kwargs: Any,
    ) -> ArrayLike:
        """Read audio data from the file.

        Args:
            normalize: When False (default) and the source is a WAV file path,
                return raw integer PCM samples cast to float32 via
                scipy.io.wavfile.read. For non-WAV formats or in-memory sources,
                always uses soundfile (returning float32 normalized to [-1.0, 1.0]).
                When True, return float32 data normalized to [-1.0, 1.0] via soundfile.
        """
        logger.debug(f"Reading {frames} frames from {path!r} starting at {start_idx}")

        is_wav = isinstance(path, (str, Path)) and Path(path).suffix.lower() == ".wav"
        if not normalize and is_wav:
            # Use scipy to return raw integer samples (no normalization), cast to float32.
            source = _prepare_file_source(path)
            _sr, raw = wavfile.read(source)
            if raw.ndim == 1:
                raw = np.expand_dims(raw, axis=0)  # mono: (1, samples)
            else:
                raw = raw.T  # stereo: (channels, samples)

            # Only reindex channels when the requested selection is not the identity.
            if channels != list(range(raw.shape[0])):
                raw = raw[channels]

            result: ArrayLike = raw[:, start_idx : start_idx + frames].astype(
                np.float32,
                copy=False,
            )
            if not isinstance(result, np.ndarray):
                raise ValueError("Unexpected data type after reading file")
            logger.debug(f"File read complete (raw), returning data with shape {result.shape}")
            return result

        with sf.SoundFile(_prepare_file_source(path)) as f:
            if start_idx > 0:
                f.seek(start_idx)
            data = f.read(frames=frames, dtype="float32", always_2d=True)

            # Select requested channels
            data = data[:, channels]

            # Transpose to get (channels, samples) format
            result = data.T
            if not isinstance(result, np.ndarray):
                raise ValueError("Unexpected data type after reading file")

        _shape = result.shape
        logger.debug(f"File read complete, returning data with shape {_shape}")
        return result
Attributes
supported_extensions = ['.wav', '.flac', '.ogg', '.aiff', '.aif', '.snd'] class-attribute instance-attribute
Functions
get_file_info(path, **kwargs) classmethod

Get basic information about the audio file.

Source code in wandas/io/readers.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@classmethod
def get_file_info(
    cls,
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    **kwargs: Any,
) -> dict[str, Any]:
    """Get basic information about the audio file."""
    info = sf.info(_prepare_file_source(path))
    return {
        "samplerate": info.samplerate,
        "channels": info.channels,
        "frames": info.frames,
        "format": info.format,
        "subtype": info.subtype,
        "duration": info.frames / info.samplerate,
    }
get_data(path, channels, start_idx, frames, normalize=False, **kwargs) classmethod

Read audio data from the file.

Parameters:

Name Type Description Default
normalize bool

When False (default) and the source is a WAV file path, return raw integer PCM samples cast to float32 via scipy.io.wavfile.read. For non-WAV formats or in-memory sources, always uses soundfile (returning float32 normalized to [-1.0, 1.0]). When True, return float32 data normalized to [-1.0, 1.0] via soundfile.

False
Source code in wandas/io/readers.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@classmethod
def get_data(
    cls,
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    channels: list[int],
    start_idx: int,
    frames: int,
    normalize: bool = False,
    **kwargs: Any,
) -> ArrayLike:
    """Read audio data from the file.

    Args:
        normalize: When False (default) and the source is a WAV file path,
            return raw integer PCM samples cast to float32 via
            scipy.io.wavfile.read. For non-WAV formats or in-memory sources,
            always uses soundfile (returning float32 normalized to [-1.0, 1.0]).
            When True, return float32 data normalized to [-1.0, 1.0] via soundfile.
    """
    logger.debug(f"Reading {frames} frames from {path!r} starting at {start_idx}")

    is_wav = isinstance(path, (str, Path)) and Path(path).suffix.lower() == ".wav"
    if not normalize and is_wav:
        # Use scipy to return raw integer samples (no normalization), cast to float32.
        source = _prepare_file_source(path)
        _sr, raw = wavfile.read(source)
        if raw.ndim == 1:
            raw = np.expand_dims(raw, axis=0)  # mono: (1, samples)
        else:
            raw = raw.T  # stereo: (channels, samples)

        # Only reindex channels when the requested selection is not the identity.
        if channels != list(range(raw.shape[0])):
            raw = raw[channels]

        result: ArrayLike = raw[:, start_idx : start_idx + frames].astype(
            np.float32,
            copy=False,
        )
        if not isinstance(result, np.ndarray):
            raise ValueError("Unexpected data type after reading file")
        logger.debug(f"File read complete (raw), returning data with shape {result.shape}")
        return result

    with sf.SoundFile(_prepare_file_source(path)) as f:
        if start_idx > 0:
            f.seek(start_idx)
        data = f.read(frames=frames, dtype="float32", always_2d=True)

        # Select requested channels
        data = data[:, channels]

        # Transpose to get (channels, samples) format
        result = data.T
        if not isinstance(result, np.ndarray):
            raise ValueError("Unexpected data type after reading file")

    _shape = result.shape
    logger.debug(f"File read complete, returning data with shape {_shape}")
    return result

CSVFileReader

Bases: FileReader

CSV file reader for time series data.

Source code in wandas/io/readers.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
class CSVFileReader(FileReader):
    """CSV file reader for time series data."""

    # CSV supported formats
    supported_extensions = [".csv"]

    @classmethod
    def get_file_info(
        cls,
        path: str | Path | bytes | bytearray | memoryview | BinaryIO,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Get basic information about the CSV file.

        Parameters
        ----------
        path : Union[str, Path]
            Path to the CSV file.
        **kwargs : Any
            Additional parameters for CSV reading. Supported parameters:

            - delimiter : str, default=","
                Delimiter character.
            - header : Optional[int], default=0
                Row number to use as header. Set to None if no header.
            - time_column : Union[int, str], default=0
                Index or name of the time column.

        Returns
        -------
        dict[str, Any]
            Dictionary containing file information including:
            - samplerate: Estimated sampling rate in Hz
            - channels: Number of data channels (excluding time column)
            - frames: Total number of frames
            - format: "CSV"
            - duration: Duration in seconds (or None if cannot be calculated)
            - ch_labels: List of channel labels

        Notes
        -----
        This method accepts CSV-specific parameters through kwargs.
        See CSVFileInfoParams for supported parameter types.
        """
        # Extract parameters with defaults
        delimiter: str = kwargs.get("delimiter", ",")
        header: int | None = kwargs.get("header", 0)
        time_column: int | str = kwargs.get("time_column", 0)

        # Read first few lines to determine structure
        df = pd.read_csv(_prepare_file_source(path), delimiter=delimiter, header=header)

        # Estimate sampling rate from first column (assuming it's time)
        try:
            # Get time column as Series
            if isinstance(time_column, str):
                time_series = df[time_column]
            else:
                time_series = df.iloc[:, time_column]
            time_values = np.array(time_series.values)
            if len(time_values) > 1:
                # Use round() instead of int() to handle floating-point precision issues
                estimated_sr = round(1 / np.mean(np.diff(time_values)))
            else:
                estimated_sr = 0  # Cannot determine from single row
        except Exception:
            estimated_sr = 0  # Default if can't calculate

        frames = df.shape[0]
        duration = frames / estimated_sr if estimated_sr > 0 else None

        # Return file info
        return {
            "samplerate": estimated_sr,
            "channels": df.shape[1] - 1,  # Assuming first column is time
            "frames": frames,
            "format": "CSV",
            "duration": duration,
            "ch_labels": df.columns[1:].tolist(),  # Assuming first column is time
        }

    @classmethod
    def get_data(
        cls,
        path: str | Path | bytes | bytearray | memoryview | BinaryIO,
        channels: list[int],
        start_idx: int,
        frames: int,
        **kwargs: Any,
    ) -> ArrayLike:
        """Read data from the CSV file.

        Parameters
        ----------
        path : Union[str, Path]
            Path to the CSV file.
        channels : list[int]
            List of channel indices to read.
        start_idx : int
            Starting frame index.
        frames : int
            Number of frames to read.
        **kwargs : Any
            Additional parameters for CSV reading. Supported parameters:

            - delimiter : str, default=","
                Delimiter character.
            - header : Optional[int], default=0
                Row number to use as header.
            - time_column : Union[int, str], default=0
                Index or name of the time column.

        Returns
        -------
        ArrayLike
            Array of shape (channels, frames) containing the data.

        Notes
        -----
        This method accepts CSV-specific parameters through kwargs.
        See CSVGetDataParams for supported parameter types.
        """
        # Extract parameters with defaults
        time_column: int | str = kwargs.get("time_column", 0)
        delimiter: str = kwargs.get("delimiter", ",")
        header: int | None = kwargs.get("header", 0)

        logger.debug(f"Reading CSV data from {path!r} starting at {start_idx}")

        # Read the CSV file
        df = pd.read_csv(_prepare_file_source(path), delimiter=delimiter, header=header)

        # Remove time column
        df = df.drop(columns=[time_column] if isinstance(time_column, str) else df.columns[time_column])

        # Select requested channels - adjust indices to account for time column removal
        if channels:
            try:
                data_df = df.iloc[:, channels]
            except IndexError:
                raise ValueError(f"Requested channels {channels} out of range")
        else:
            data_df = df

        # Handle start_idx and frames for partial reading
        end_idx = start_idx + frames if frames > 0 else None
        data_df = data_df.iloc[start_idx:end_idx]

        # Convert to numpy array and transpose to (channels, samples) format
        result = data_df.values.T

        if not isinstance(result, np.ndarray):
            raise ValueError("Unexpected data type after reading file")

        _shape = result.shape
        logger.debug(f"CSV read complete, returning data with shape {_shape}")
        return result
Attributes
supported_extensions = ['.csv'] class-attribute instance-attribute
Functions
get_file_info(path, **kwargs) classmethod

Get basic information about the CSV file.

Parameters

path : Union[str, Path] Path to the CSV file. **kwargs : Any Additional parameters for CSV reading. Supported parameters:

- delimiter : str, default=","
    Delimiter character.
- header : Optional[int], default=0
    Row number to use as header. Set to None if no header.
- time_column : Union[int, str], default=0
    Index or name of the time column.
Returns

dict[str, Any] Dictionary containing file information including: - samplerate: Estimated sampling rate in Hz - channels: Number of data channels (excluding time column) - frames: Total number of frames - format: "CSV" - duration: Duration in seconds (or None if cannot be calculated) - ch_labels: List of channel labels

Notes

This method accepts CSV-specific parameters through kwargs. See CSVFileInfoParams for supported parameter types.

Source code in wandas/io/readers.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@classmethod
def get_file_info(
    cls,
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    **kwargs: Any,
) -> dict[str, Any]:
    """Get basic information about the CSV file.

    Parameters
    ----------
    path : Union[str, Path]
        Path to the CSV file.
    **kwargs : Any
        Additional parameters for CSV reading. Supported parameters:

        - delimiter : str, default=","
            Delimiter character.
        - header : Optional[int], default=0
            Row number to use as header. Set to None if no header.
        - time_column : Union[int, str], default=0
            Index or name of the time column.

    Returns
    -------
    dict[str, Any]
        Dictionary containing file information including:
        - samplerate: Estimated sampling rate in Hz
        - channels: Number of data channels (excluding time column)
        - frames: Total number of frames
        - format: "CSV"
        - duration: Duration in seconds (or None if cannot be calculated)
        - ch_labels: List of channel labels

    Notes
    -----
    This method accepts CSV-specific parameters through kwargs.
    See CSVFileInfoParams for supported parameter types.
    """
    # Extract parameters with defaults
    delimiter: str = kwargs.get("delimiter", ",")
    header: int | None = kwargs.get("header", 0)
    time_column: int | str = kwargs.get("time_column", 0)

    # Read first few lines to determine structure
    df = pd.read_csv(_prepare_file_source(path), delimiter=delimiter, header=header)

    # Estimate sampling rate from first column (assuming it's time)
    try:
        # Get time column as Series
        if isinstance(time_column, str):
            time_series = df[time_column]
        else:
            time_series = df.iloc[:, time_column]
        time_values = np.array(time_series.values)
        if len(time_values) > 1:
            # Use round() instead of int() to handle floating-point precision issues
            estimated_sr = round(1 / np.mean(np.diff(time_values)))
        else:
            estimated_sr = 0  # Cannot determine from single row
    except Exception:
        estimated_sr = 0  # Default if can't calculate

    frames = df.shape[0]
    duration = frames / estimated_sr if estimated_sr > 0 else None

    # Return file info
    return {
        "samplerate": estimated_sr,
        "channels": df.shape[1] - 1,  # Assuming first column is time
        "frames": frames,
        "format": "CSV",
        "duration": duration,
        "ch_labels": df.columns[1:].tolist(),  # Assuming first column is time
    }
get_data(path, channels, start_idx, frames, **kwargs) classmethod

Read data from the CSV file.

Parameters

path : Union[str, Path] Path to the CSV file. channels : list[int] List of channel indices to read. start_idx : int Starting frame index. frames : int Number of frames to read. **kwargs : Any Additional parameters for CSV reading. Supported parameters:

- delimiter : str, default=","
    Delimiter character.
- header : Optional[int], default=0
    Row number to use as header.
- time_column : Union[int, str], default=0
    Index or name of the time column.
Returns

ArrayLike Array of shape (channels, frames) containing the data.

Notes

This method accepts CSV-specific parameters through kwargs. See CSVGetDataParams for supported parameter types.

Source code in wandas/io/readers.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
@classmethod
def get_data(
    cls,
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    channels: list[int],
    start_idx: int,
    frames: int,
    **kwargs: Any,
) -> ArrayLike:
    """Read data from the CSV file.

    Parameters
    ----------
    path : Union[str, Path]
        Path to the CSV file.
    channels : list[int]
        List of channel indices to read.
    start_idx : int
        Starting frame index.
    frames : int
        Number of frames to read.
    **kwargs : Any
        Additional parameters for CSV reading. Supported parameters:

        - delimiter : str, default=","
            Delimiter character.
        - header : Optional[int], default=0
            Row number to use as header.
        - time_column : Union[int, str], default=0
            Index or name of the time column.

    Returns
    -------
    ArrayLike
        Array of shape (channels, frames) containing the data.

    Notes
    -----
    This method accepts CSV-specific parameters through kwargs.
    See CSVGetDataParams for supported parameter types.
    """
    # Extract parameters with defaults
    time_column: int | str = kwargs.get("time_column", 0)
    delimiter: str = kwargs.get("delimiter", ",")
    header: int | None = kwargs.get("header", 0)

    logger.debug(f"Reading CSV data from {path!r} starting at {start_idx}")

    # Read the CSV file
    df = pd.read_csv(_prepare_file_source(path), delimiter=delimiter, header=header)

    # Remove time column
    df = df.drop(columns=[time_column] if isinstance(time_column, str) else df.columns[time_column])

    # Select requested channels - adjust indices to account for time column removal
    if channels:
        try:
            data_df = df.iloc[:, channels]
        except IndexError:
            raise ValueError(f"Requested channels {channels} out of range")
    else:
        data_df = df

    # Handle start_idx and frames for partial reading
    end_idx = start_idx + frames if frames > 0 else None
    data_df = data_df.iloc[start_idx:end_idx]

    # Convert to numpy array and transpose to (channels, samples) format
    result = data_df.values.T

    if not isinstance(result, np.ndarray):
        raise ValueError("Unexpected data type after reading file")

    _shape = result.shape
    logger.debug(f"CSV read complete, returning data with shape {_shape}")
    return result

Functions

get_file_reader(path, *, file_type=None)

Get an appropriate file reader for the given path or file type.

Source code in wandas/io/readers.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def get_file_reader(
    path: str | Path | bytes | bytearray | memoryview | BinaryIO,
    *,
    file_type: str | None = None,
) -> FileReader:
    """Get an appropriate file reader for the given path or file type."""
    path_str = str(path)
    ext = _normalize_extension(file_type)
    if ext is None and isinstance(path, (str, Path)):
        ext = Path(path).suffix.lower()
    if not ext:
        raise ValueError(
            "File type is required when the extension is missing\n"
            "  Cannot determine format without an extension\n"
            "  Provide file_type like '.wav' or '.csv'"
        )

    # Try each reader in order
    for reader in _file_readers:
        if ext in reader.__class__.supported_extensions:
            logger.debug(f"Using {reader.__class__.__name__} for {path_str}")
            return reader

    # If no reader found, raise error
    raise ValueError(f"No suitable file reader found for {path_str}")

register_file_reader(reader_class)

Register a new file reader.

Source code in wandas/io/readers.py
417
418
419
420
421
def register_file_reader(reader_class: type) -> None:
    """Register a new file reader."""
    reader = reader_class()
    _file_readers.append(reader)
    logger.debug(f"Registered new file reader: {reader_class.__name__}")

WAV File IO / WAVファイル入出力

Provides functions for reading and writing WAV files. WAVファイルの読み書き機能を提供します。

wandas.io.wav_io

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

Functions

write_wav(filename, target, format=None)

Write a ChannelFrame object to a WAV file.

Parameters

filename : str Path to the WAV file. target : ChannelFrame ChannelFrame object containing the data to write. format : str, optional File format. If None, determined from file extension.

Raises

ValueError If target is not a ChannelFrame object.

Source code in wandas/io/wav_io.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def write_wav(filename: str, target: "ChannelFrame", format: str | None = None) -> None:
    """
    Write a ChannelFrame object to a WAV file.

    Parameters
    ----------
    filename : str
        Path to the WAV file.
    target : ChannelFrame
        ChannelFrame object containing the data to write.
    format : str, optional
        File format. If None, determined from file extension.

    Raises
    ------
    ValueError
        If target is not a ChannelFrame object.
    """
    from wandas.frames.channel import ChannelFrame

    if not isinstance(target, ChannelFrame):
        raise ValueError("target must be a ChannelFrame object.")

    logger.debug(f"Saving audio data to file: {filename} (will compute now)")
    data = target.compute()
    data = data.T
    if data.shape[1] == 1:
        data = data.squeeze(axis=1)
    if np.issubdtype(data.dtype, np.floating) and np.max(np.abs(data)) <= 1:
        sf.write(
            str(filename),
            data,
            int(target.sampling_rate),
            subtype="FLOAT",
            format=format,
        )
    else:
        sf.write(str(filename), data, int(target.sampling_rate), format=format)
    logger.debug(f"Save complete: {filename}")

WDF File IO / WDFファイル入出力

Provides functions for reading and writing WDF (Wandas Data File) format, which enables complete preservation including metadata. WDF(Wandas Data File)形式の読み書き機能を提供します。このフォーマットはメタデータを含む完全な保存が可能です。

wandas.io.wdf_io

WDF (Wandas Data File) I/O module for saving and loading ChannelFrame objects.

This module provides functionality to save and load ChannelFrame objects in the WDF (Wandas Data File) format, which is based on HDF5. The format preserves all metadata including sampling rate, channel labels, units, and frame metadata.

Attributes

logger = logging.getLogger(__name__) module-attribute

WDF_FORMAT_VERSION = '0.1' module-attribute

Classes

Functions

save(frame, path, *, format='hdf5', compress='gzip', overwrite=False, dtype=None)

Save a frame to a file.

Parameters:

Name Type Description Default
frame BaseFrame[Any]

The frame to save.

required
path str | Path

Path to save the file. '.wdf' extension will be added if not present.

required
format str

Format to use (currently only 'hdf5' is supported)

'hdf5'
compress str | None

Compression method ('gzip' by default, None for no compression)

'gzip'
overwrite bool

Whether to overwrite existing file

False
dtype str | dtype[Any] | None

Optional data type conversion before saving (e.g. 'float32')

None

Raises:

Type Description
FileExistsError

If the file exists and overwrite=False.

NotImplementedError

For unsupported formats.

Source code in wandas/io/wdf_io.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def save(
    frame: BaseFrame[Any],
    path: str | Path,
    *,
    format: str = "hdf5",
    compress: str | None = "gzip",
    overwrite: bool = False,
    dtype: str | np.dtype[Any] | None = None,
) -> None:
    """Save a frame to a file.

    Args:
        frame: The frame to save.
        path: Path to save the file. '.wdf' extension will be added if not present.
        format: Format to use (currently only 'hdf5' is supported)
        compress: Compression method ('gzip' by default, None for no compression)
        overwrite: Whether to overwrite existing file
        dtype: Optional data type conversion before saving (e.g. 'float32')

    Raises:
        FileExistsError: If the file exists and overwrite=False.
        NotImplementedError: For unsupported formats.
    """
    # Handle path
    path = Path(path)
    if path.suffix != ".wdf":
        path = path.with_suffix(".wdf")

    # Check if file exists
    if path.exists() and not overwrite:
        raise FileExistsError(f"File {path} already exists. Set overwrite=True to overwrite.")

    # Currently only HDF5 is supported
    if format.lower() != "hdf5":
        raise NotImplementedError(f"Format {format} not supported. Only 'hdf5' is currently implemented.")

    # Compute data arrays (this triggers actual computation)
    logger.info("Computing data arrays for saving...")
    computed_data = frame.compute()
    if dtype is not None:
        computed_data = computed_data.astype(dtype)

    # Create file
    logger.info(f"Creating HDF5 file at {path}...")
    with h5py.File(path, "w") as f:
        # Set file version
        f.attrs["version"] = WDF_FORMAT_VERSION

        # Store frame metadata
        f.attrs["sampling_rate"] = frame.sampling_rate
        f.attrs["label"] = frame.label or ""
        f.attrs["frame_type"] = type(frame).__name__

        # Create channels group
        channels_grp = f.create_group("channels")

        # Store each channel
        for i, (channel_data, ch_meta) in enumerate(zip(computed_data, frame._channel_metadata)):
            ch_grp = channels_grp.create_group(f"{i}")

            # Store channel data
            if compress:
                ch_grp.create_dataset("data", data=channel_data, compression=compress)
            else:
                ch_grp.create_dataset("data", data=channel_data)

            # Store metadata
            ch_grp.attrs["label"] = ch_meta.label
            ch_grp.attrs["unit"] = ch_meta.unit

            # Store extra metadata as JSON
            if ch_meta.extra:
                ch_grp.attrs["metadata_json"] = json.dumps(ch_meta.extra)

        # Store operation history
        if frame.operation_history:
            op_grp = f.create_group("operation_history")
            for i, op in enumerate(frame.operation_history):
                op_sub_grp = op_grp.create_group(f"operation_{i}")
                for k, v in op.items():
                    # Store simple attributes directly
                    if isinstance(v, (str, int, float, bool, np.number)):
                        op_sub_grp.attrs[k] = v
                    else:
                        # For complex types, serialize to JSON
                        try:
                            op_sub_grp.attrs[k] = json.dumps(v)
                        except (TypeError, OverflowError) as e:
                            logger.warning(f"Could not serialize operation key '{k}': {e}")
                            op_sub_grp.attrs[k] = str(v)

        # Store frame metadata
        dict_is_nonempty = bool(frame.metadata)
        has_source_file = isinstance(frame.metadata, FrameMetadata) and frame.metadata.source_file is not None
        if dict_is_nonempty or has_source_file:
            meta_grp = f.create_group("meta")
            # Store metadata dict content as JSON
            meta_grp.attrs["json"] = json.dumps(dict(frame.metadata))

            # Store source_file separately if present
            if has_source_file:
                meta_grp.attrs["source_file"] = str(frame.metadata.source_file)

            # Also store individual metadata items as attributes for compatibility
            for k, v in frame.metadata.items():
                if isinstance(v, (str, int, float, bool, np.number)):
                    meta_grp.attrs[k] = v

    logger.info(f"Frame saved to {path}")

load(path, *, format='hdf5', timeout=10.0)

Load a ChannelFrame object from a WDF (Wandas Data File) file or URL.

Parameters:

Name Type Description Default
path str | Path

Path to the WDF file to load, or an HTTP/HTTPS URL pointing to a remote WDF file. When a URL is given the file is downloaded in full before opening.

required
format str

Format of the file. Currently only "hdf5" is supported.

'hdf5'
timeout float

Timeout in seconds for HTTP/HTTPS URL downloads. Default is 10.0 seconds. Has no effect for local file paths.

10.0

Returns:

Type Description
ChannelFrame

A new ChannelFrame object with data and metadata loaded from the file.

Raises:

Type Description
FileNotFoundError

If the file doesn't exist.

NotImplementedError

If format is not "hdf5".

ValueError

If the file format is invalid or incompatible.

Example

cf = ChannelFrame.load("audio_data.wdf") cf = ChannelFrame.load("https://example.com/audio_data.wdf")

Source code in wandas/io/wdf_io.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def load(path: str | Path, *, format: str = "hdf5", timeout: float = 10.0) -> "ChannelFrame":
    """Load a ChannelFrame object from a WDF (Wandas Data File) file or URL.

    Args:
        path: Path to the WDF file to load, or an HTTP/HTTPS URL pointing to
            a remote WDF file. When a URL is given the file is downloaded in
            full before opening.
        format: Format of the file. Currently only "hdf5" is supported.
        timeout: Timeout in seconds for HTTP/HTTPS URL downloads. Default is
            10.0 seconds. Has no effect for local file paths.

    Returns:
        A new ChannelFrame object with data and metadata loaded from the file.

    Raises:
        FileNotFoundError: If the file doesn't exist.
        NotImplementedError: If format is not "hdf5".
        ValueError: If the file format is invalid or incompatible.

    Example:
        >>> cf = ChannelFrame.load("audio_data.wdf")
        >>> cf = ChannelFrame.load("https://example.com/audio_data.wdf")
    """
    # Ensure ChannelFrame is imported here to avoid circular imports
    from ..core.metadata import ChannelMetadata
    from ..frames.channel import ChannelFrame

    if format.lower() != "hdf5":
        raise NotImplementedError(f"Format '{format}' is not supported")

    # Detect and handle URL paths — download to memory before HDF5 open.
    h5_source: str | Path | io.BytesIO
    h5_kwargs: dict[str, object] = {}
    if isinstance(path, str) and (path.startswith("http://") or path.startswith("https://")):
        import urllib.error
        import urllib.request

        logger.debug(f"Downloading WDF from URL: {path}")
        try:
            with urllib.request.urlopen(path, timeout=timeout) as _resp:
                h5_source = io.BytesIO(_resp.read())
        except urllib.error.URLError as exc:
            raise OSError(
                f"Failed to download WDF file from URL\n"
                f"  URL: {path}\n"
                f"  Error: {exc}\n"
                f"Verify the URL is accessible and try again."
            ) from exc
        h5_kwargs = {"driver": "fileobj"}
    else:
        path = Path(path)
        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")
        h5_source = path

    logger.debug(f"Loading ChannelFrame from {h5_source!r}")

    with h5py.File(h5_source, "r", **h5_kwargs) as f:
        # Check format version for compatibility
        version = f.attrs.get("version", "unknown")
        if version != WDF_FORMAT_VERSION:
            logger.warning(
                f"File format version mismatch: file={version}, current={WDF_FORMAT_VERSION}"  # noqa: E501
            )

        # Get global attributes
        sampling_rate = float(f.attrs["sampling_rate"])
        frame_label = f.attrs.get("label", "")

        # Get frame metadata
        frame_metadata = FrameMetadata()
        if "meta" in f:
            meta_json = f["meta"].attrs.get("json", "{}")
            if isinstance(meta_json, (bytes, np.bytes_)):
                try:
                    meta_json = meta_json.decode("utf-8")
                except (UnicodeDecodeError, AttributeError):
                    meta_json = str(meta_json)
            frame_metadata.update(json.loads(meta_json))
            source_file = f["meta"].attrs.get("source_file", None)
            if source_file is not None:
                if isinstance(source_file, (bytes, np.bytes_)):
                    try:
                        source_file = source_file.decode("utf-8")
                    except (UnicodeDecodeError, AttributeError):
                        source_file = str(source_file)
                frame_metadata.source_file = str(source_file)

        # Load operation history
        operation_history = []
        if "operation_history" in f:
            op_grp = f["operation_history"]
            # Sort operation indices numerically
            op_indices = sorted([int(key.split("_")[1]) for key in op_grp.keys()])

            for idx in op_indices:
                op_sub_grp = op_grp[f"operation_{idx}"]
                op_dict = {}
                for attr_name in op_sub_grp.attrs:
                    attr_value = op_sub_grp.attrs[attr_name]
                    # Try to deserialize JSON, fallback to string
                    try:
                        op_dict[attr_name] = json.loads(attr_value)
                    except (json.JSONDecodeError, TypeError):
                        op_dict[attr_name] = attr_value
                operation_history.append(op_dict)

        # Load channel data and metadata
        all_channel_data = []
        channel_metadata_list = []

        if "channels" in f:
            channels_group = f["channels"]
            # Sort channel indices numerically
            channel_indices = sorted([int(key) for key in channels_group.keys()])

            for idx in channel_indices:
                ch_group = channels_group[f"{idx}"]

                # Load channel data
                channel_data = ch_group["data"][()]

                # Append to combined array
                all_channel_data.append(channel_data)

                # Load channel metadata
                label = ch_group.attrs.get("label", f"Ch{idx}")
                unit = ch_group.attrs.get("unit", "")

                # Load additional metadata if present
                ch_extra = {}
                if "metadata_json" in ch_group.attrs:
                    ch_extra = json.loads(ch_group.attrs["metadata_json"])

                # Create ChannelMetadata object
                channel_metadata = ChannelMetadata(label=label, unit=unit, extra=ch_extra)
                channel_metadata_list.append(channel_metadata)

        # Stack channel data into a single array
        if all_channel_data:
            combined_data = np.stack(all_channel_data, axis=0)
        else:
            raise ValueError("No channel data found in the file")

        # Create a new ChannelFrame
        # Use channel-wise chunking: 1 for channel axis and -1 for samples
        dask_data = _da_from_array(combined_data, chunks=(1, -1))

        cf = ChannelFrame(
            data=dask_data,
            sampling_rate=sampling_rate,
            label=frame_label if frame_label else None,
            metadata=frame_metadata,
            operation_history=operation_history,
            channel_metadata=channel_metadata_list,
        )

        logger.debug(
            f"ChannelFrame loaded from {path}: {len(cf)} channels, {cf.n_samples} samples"  # noqa: E501
        )
        return cf