Source code for sc3nb.sc_objects.buffer

"""Module for using SuperCollider Buffers in Python"""

import os
import warnings
from enum import Enum, unique
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, List, NamedTuple, Optional, Sequence, Union

import numpy as np
import scipy.io.wavfile as wavfile

import sc3nb
from sc3nb.sc_objects.node import Synth
from sc3nb.sc_objects.synthdef import SynthDef

if TYPE_CHECKING:
    import pya

    from sc3nb.sc_objects.server import SCServer


@unique
[docs]class BufferReply(str, Enum): """Buffer Command Replies"""
[docs] INFO = "/b_info"
@unique
[docs]class BufferCommand(str, Enum): """Buffer OSC Commands for Buffers"""
[docs] ALLOC = "/b_alloc"
[docs] ALLOC_READ = "/b_allocRead"
[docs] ALLOC_READ_CHANNEL = "/b_allocReadChannel"
[docs] READ = "/b_read"
[docs] READ_CHANNEL = "/b_readChannel"
[docs] WRITE = "/b_write"
[docs] FREE = "/b_free"
[docs] ZERO = "/b_zero"
[docs] SET = "/b_set"
[docs] SETN = "/b_setn"
[docs] FILL = "/b_fill"
[docs] GEN = "/b_gen"
[docs] CLOSE = "/b_close"
[docs] QUERY = "/b_query"
[docs] GET = "/b_get"
[docs] GETN = "/b_getn"
@unique
[docs]class BufferAllocationMode(str, Enum): """Buffer Allocation Modes"""
[docs] FILE = "file"
[docs] ALLOC = "alloc"
[docs] DATA = "data"
[docs] EXISTING = "existing"
[docs] COPY = "copy"
[docs] NONE = "none"
[docs]class BufferInfo(NamedTuple): """Information about the Buffer"""
[docs] bufnum: int
[docs] num_frames: int
[docs] num_channels: int
[docs] sample_rate: float
[docs]class Buffer: """A Buffer object represents a SuperCollider3 Buffer on scsynth and provides access to low-level buffer commands of scsynth via methods of the Buffer objects. The constructor merely initializes a buffer: * it selects a buffer number using the server's buffer allocator * it initializes attribute variables Parameters ---------- bufnum : int, optional buffer number to be used on scsynth. Defaults to None, can be set to enforce a given bufnum server : SCServer, optional The server instance to establish the Buffer, by default use the SC default server Attributes ---------- server : the SCServer object to communicate with scsynth _bufnum : int buffer number = bufnum id on scsynth _sr : int the sampling rate of the buffer _channels : int number of channels of the buffer _samples : int buffer length = number of sample frames _alloc_mode : str ['file', 'alloc', 'data', 'existing', 'copy'] according to previously used generator, defaults to None _allocated : boolean True if Buffer has been allocated by any of the initialization methods _path : str path to the audio file used in load_file() Notes ----- For more information on Buffer commands, refer to the Server Command Reference in SC3. https://doc.sccode.org/Reference/Server-Command-Reference.html#Buffer%20Commands Examples -------- (see examples/buffer-examples.ipynb) >>> b = Buffer().read(...) >>> b = Buffer().load_data(...) >>> b = Buffer().alloc(...) >>> b = Buffer().load_asig(...) >>> b = Buffer().use_existing(...) >>> b = Buffer().copy(Buffer) """ def __init__( self, bufnum: Optional[int] = None, server: Optional["SCServer"] = None ) -> None: self._server = server or sc3nb.SC.get_default().server self._bufnum_set_manually = bufnum is not None self._bufnum = bufnum self._sr = None self._channels = None self._samples = None self._alloc_mode = BufferAllocationMode.NONE self._allocated = False self._path = None self._synth_def = None self._synth = None # Section: Buffer initialization methods
[docs] def read( self, path: str, starting_frame: int = 0, num_frames: int = -1, channels: Optional[Union[int, Sequence[int]]] = None, ) -> "Buffer": """Allocate buffer memory and read a sound file. If the number of frames argument num_frames is negative or zero, the entire file is read. Parameters ---------- path : string path name of a sound file. starting_frame : int starting frame in file num_frames : int number of frames to read channels : list | int channels and order of channels to be read from file. if only a int is provided it is loaded as only channel Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is already allocated. """ if self._allocated: raise RuntimeError("Buffer object is already initialized!") if self._bufnum is None: self._bufnum = self._server.buffer_ids.allocate(num=1)[0] self._alloc_mode = BufferAllocationMode.FILE self._path = Path(path).resolve(strict=True) self._sr, data = wavfile.read( self._path ) # TODO: we only need the metadata here server_sr = self._server.nominal_sr if self._sr != server_sr: warnings.warn( f"Sample rate of file ({self._sr}) does not " f"match the SC Server sample rate ({server_sr})" ) self._samples = data.shape[0] if num_frames <= 0 else num_frames if channels is None: channels = [0] if len(data.shape) == 1 else range(data.shape[1]) elif isinstance(channels, int): channels = [channels] self._channels = len(channels) self._server.msg( BufferCommand.ALLOC_READ_CHANNEL, [self._bufnum, str(self._path), starting_frame, num_frames, *channels], bundle=True, ) self._allocated = True return self
[docs] def alloc(self, size: int, sr: int = 44100, channels: int = 1) -> "Buffer": """Allocate buffer memory. Parameters ---------- size : int number of frames sr : int sampling rate in Hz (optional. default = 44100) channels : int number of channels (optional. default = 1 channel) Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is already allocated. """ if self._allocated: raise RuntimeError("Buffer object is already initialized!") if self._bufnum is None: self._bufnum = self._server.buffer_ids.allocate(num=1)[0] self._sr = sr self._alloc_mode = BufferAllocationMode.ALLOC self._channels = channels self._samples = int(size) self._server.msg( BufferCommand.ALLOC, [self._bufnum, size, channels], bundle=True ) self._allocated = True return self
[docs] def load_data( self, data: np.ndarray, sr: int = 44100, mode: str = "file", sync: bool = True, ) -> "Buffer": """Allocate buffer memory and read input data. Parameters ---------- data : numpy array Data which should inserted sr : int, default: 44100 sample rate mode : 'file' or 'osc' Insert data via filemode ('file') or n_set OSC commands ('osc') Bundling is only supported for 'osc' mode and if sync is False. sync: bool, default: True Use SCServer.sync after sending messages when mode = 'osc' Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is already allocated. """ if self._allocated: raise RuntimeError("Buffer object is already initialized!") if self._bufnum is None: self._bufnum = self._server.buffer_ids.allocate(num=1)[0] self._alloc_mode = BufferAllocationMode.DATA self._sr = sr self._samples = data.shape[0] self._channels = 1 if len(data.shape) == 1 else data.shape[1] if mode == "file": tempfile = NamedTemporaryFile(delete=False) try: wavfile.write(tempfile, self._sr, data) finally: tempfile.close() self._server.msg( BufferCommand.ALLOC_READ, [self._bufnum, tempfile.name], await_reply=True, ) if os.path.exists(tempfile.name): os.remove(tempfile.name) elif mode == "osc": self._server.msg( BufferCommand.ALLOC, [self._bufnum, data.shape[0]], bundle=True ) blocksize = 1000 # array size compatible with OSC packet size # TODO: check how this depends on datagram size # TODO: put into Buffer header as const if needed elsewhere... if self._channels > 1: data = data.reshape(-1, 1) if data.shape[0] < blocksize: self._server.msg( BufferCommand.SETN, [self._bufnum, [0, data.shape[0], data.tolist()]], bundle=True, ) else: # For datasets larger than {blocksize} entries, # split data to avoid network problems splitdata = np.array_split(data, data.shape[0] / blocksize) for i, chunk in enumerate(splitdata): self._server.msg( BufferCommand.SETN, [self._bufnum, i * blocksize, chunk.shape[0], chunk.tolist()], await_reply=False, bundle=True, ) if sync: self._server.sync() else: raise ValueError(f"Unsupported mode '{mode}'.") self._allocated = True return self
[docs] def load_collection( self, data: np.ndarray, mode: str = "file", sr: int = 44100 ) -> "Buffer": """Wrapper method of :func:`Buffer.load_data`""" return self.load_data(data, sr=sr, mode=mode)
[docs] def load_asig(self, asig: "pya.Asig", mode: str = "file") -> "Buffer": """Create buffer from asig Parameters ---------- asig : pya.Asig asig to be loaded in buffer mode : str, optional Insert data via filemode ('file') or n_set OSC commands ('osc'), by default 'file' Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is already allocated. """ if self._allocated: raise RuntimeError("Buffer object is already initialized!") return self.load_data(asig.sig, sr=asig.sr, mode=mode)
[docs] def use_existing(self, bufnum: int, sr: int = 44100) -> "Buffer": """Creates a buffer object from already existing Buffer bufnum. Parameters ---------- bufnum : int buffer node id sr : int Sample rate Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is already allocated. """ if self._allocated: raise RuntimeError("Buffer object is already initialized!") self._alloc_mode = BufferAllocationMode.EXISTING self._sr = sr self._bufnum = bufnum self._allocated = True info = self.query() self._samples = info.num_frames self._channels = info.num_channels return self
[docs] def copy_existing(self, buffer: "Buffer") -> "Buffer": """Duplicate an existing buffer Parameters ---------- buffer : Buffer object Buffer which should be duplicated Returns ------- self : Buffer the newly created Buffer object Raises ------ RuntimeError If the Buffer is already allocated. """ if self._allocated: raise RuntimeError("Buffer object is already initialized!") if not buffer.allocated: raise RuntimeError("Other Buffer object is not initialized!") # If both buffers use the same server -> copy buffer directly in the server if self._server is buffer._server: self.alloc(buffer.samples, buffer.sr, buffer.channels) self.gen_copy(buffer, 0, 0, -1) else: # both sc instances must have the same file server self._sr = buffer.sr tempfile = NamedTemporaryFile(delete=False) tempfile.close() try: buffer.write(tempfile.name) self.read(tempfile.name) finally: if os.path.exists(tempfile.name): os.remove(tempfile.name) self._alloc_mode = BufferAllocationMode.COPY return self
# Section: Buffer modification methods
[docs] def fill(self, start: int = 0, count: int = 0, value: float = 0) -> "Buffer": """Fill range of samples with value(s). Parameters ---------- start : int or list int : sample starting index list : n*[start, count, value] list count : int number of samples to fill value : float value Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ # TODO implement this correctly if not self._allocated: raise RuntimeError("Buffer object is not initialized!") values = [start, count, value] if not isinstance(start, list) else start self._server.msg(BufferCommand.FILL, [self._bufnum] + values, bundle=True) return self
[docs] def gen(self, command: str, args: List[Any]) -> "Buffer": """Call a command to fill a buffer. If you know, what you do -> you can use this method. See Also -------- gen_sine1, gen_sine2, gen_cheby, gen_cheby, gen_copy Parameters ---------- command : str What fill command to use. args : List[Any] Arguments for command Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") self._server.msg(BufferCommand.GEN, [self._bufnum, command] + args, bundle=True) return self
[docs] def zero(self) -> "Buffer": """Set buffer data to zero. Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") self._server.msg(BufferCommand.ZERO, [self._bufnum], bundle=True) return self
[docs] def gen_sine1( self, amplitudes: List[float], normalize: bool = False, wavetable: bool = False, clear: bool = False, ) -> "Buffer": """Fill the buffer with sine waves & given amplitude Parameters ---------- amplitudes : list The first float value specifies the amplitude of the first partial, the second float value specifies the amplitude of the second partial, and so on. normalize : bool Normalize peak amplitude of wave to 1.0. wavetable : bool If set, then the buffer is written in wavetable format so that it can be read by interpolating oscillators. clear : bool If set then the buffer is cleared before new partials are written into it. Otherwise the new partials are summed with the existing contents of the buffer Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ return self.gen( "sine1", [self._gen_flags(normalize, wavetable, clear), amplitudes]
)
[docs] def gen_sine2( self, freq_amps: List[float], normalize: bool = False, wavetable: bool = False, clear: bool = False, ) -> "Buffer": """Fill the buffer with sine waves given list of [frequency, amplitude] lists Parameters ---------- freq_amps : list Similar to sine1 except that each partial frequency is specified explicitly instead of being an integer multiple of the fundamental. Non-integer partial frequencies are possible. normalize : bool If set, normalize peak amplitude of wave to 1.0. wavetable : bool If set, the buffer is written in wavetable format so that it can be read by interpolating oscillators. clear : bool If set, the buffer is cleared before new partials are written into it. Otherwise the new partials are summed with the existing contents of the buffer. Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ return self.gen( "sine2", [self._gen_flags(normalize, wavetable, clear), freq_amps]
)
[docs] def gen_sine3( self, freqs_amps_phases: List[float], normalize: bool = False, wavetable: bool = False, clear: bool = False, ) -> "Buffer": """Fill the buffer with sine waves & given a list of [frequency, amplitude, phase] entries. Parameters ---------- freqs_amps_phases : list Similar to sine2 except that each partial may have a nonzero starting phase. normalize : bool if set, normalize peak amplitude of wave to 1.0. wavetable : bool If set, the buffer is written in wavetable format so that it can be read by interpolating oscillators. clear : bool If set, the buffer is cleared before new partials are written into it. Otherwise the new partials are summed with the existing contents of the buffer. Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ return self.gen( "sine3", [self._gen_flags(normalize, wavetable, clear), freqs_amps_phases]
)
[docs] def gen_cheby( self, amplitudes: List[float], normalize: bool = False, wavetable: bool = False, clear: bool = False, ) -> "Buffer": """Fills a buffer with a series of chebyshev polynomials, which can be defined as cheby(n) = amplitude * cos(n * acos(x)) Parameters ---------- amplitudes : list The first float value specifies the amplitude for n = 1, the second float value specifies the amplitude for n = 2, and so on normalize : bool If set, normalize the peak amplitude of the Buffer to 1.0. wavetable : bool If set, the buffer is written in wavetable format so that it can be read by interpolating oscillators. clear : bool If set the buffer is cleared before new partials are written into it. Otherwise the new partials are summed with the existing contents of the buffer. Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ return self.gen( "cheby", [self._gen_flags(normalize, wavetable, clear), amplitudes]
)
[docs] def gen_copy( self, source: "Buffer", source_pos: int, dest_pos: int, copy_amount: int ) -> "Buffer": """Copy samples from the source buffer to the destination buffer specified in the b_gen command. Parameters ---------- source : Buffer Source buffer object source_pos : int sample position in source dest_pos : int sample position in destination copy_amount : int number of samples to copy. If the number of samples to copy is negative, the maximum number of samples possible is copied. Returns ------- self : Buffer the created Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ return self.gen("copy", [dest_pos, source.bufnum, source_pos, copy_amount])
# Section: Buffer output methods
[docs] def play( self, rate: float = 1, loop: bool = False, pan: float = 0, amp: float = 0.3 ) -> Synth: """Play the Buffer using a Synth Parameters ---------- rate : float, optional plackback rate, by default 1 loop : bool, optional if True loop the playback, by default False pan : int, optional pan position, -1 is left, +1 is right, by default 0 amp : float, optional amplitude, by default 0.3 Returns ------- Synth Synth to control playback. Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") if self._synth_def is None: playbuf_def = """ { |out=0, bufnum=^bufnum, rate=^rate, loop=^loop, pan=^pan, amp=^amp | var sig = PlayBuf.ar(^num_channels, bufnum, rate*BufRateScale.kr(bufnum), loop: loop, doneAction: Done.freeSelf); Out.ar(out, Pan2.ar(sig, pan, amp)) }""" self._synth_def = SynthDef( name=f"sc3nb_playbuf_{self.bufnum}", definition=playbuf_def ) synth_name = self._synth_def.add( pyvars={ "num_channels": self.channels, "bufnum": self.bufnum, "rate": rate, "loop": 1 if loop else 0, "pan": pan, "amp": amp, } ) self._synth = Synth(name=synth_name, server=self._server) else: self._synth.new( {"rate": rate, "loop": 1 if loop else 0, "pan": pan, "amp": amp} ) return self._synth
[docs] def write( self, path: str, header: str = "wav", sample: str = "float", num_frames: int = -1, starting_frame: int = 0, leave_open: bool = False, ) -> "Buffer": """Write buffer data to a sound file Parameters ---------- path : string path name of a sound file. header : string header format. Header format is one of: "aiff", "next", "wav", "ircam"", "raw" sample : string sample format. Sample format is one of: "int8", "int16", "int24", "int32", "float", "double", "mulaw", "alaw" num_frames : int number of frames to write. -1 means all frames. starting_frame : int starting frame in buffer leave_open : boolean Whether you want the buffer file left open. For use with DiskOut you will want this to be true. The file is created, but no frames are written until the DiskOut UGen does so. The default is false which is the correct value for all other cases. Returns ------- self : Buffer the Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") leave_open_val = 1 if leave_open else 0 path = str(Path(path).resolve()) self._server.msg( BufferCommand.WRITE, [ self._bufnum, path, header, sample, num_frames, starting_frame, leave_open_val, ], bundle=True, ) return self
[docs] def close(self) -> "Buffer": """Close soundfile after using a Buffer with DiskOut Returns ------- self : Buffer the Buffer object Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") self._server.msg(BufferCommand.CLOSE, [self._bufnum], bundle=True) return self
[docs] def to_array(self) -> np.ndarray: """Return the buffer data as an array representation. Returns ------- np.ndarray: Values of the buffer Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") data = [] blocksize = 1000 # array size compatible with OSC packet size i = 0 num_samples = self._samples * self._channels while i < num_samples: bs = blocksize if i + blocksize < num_samples else num_samples - i tmp = self._server.msg( BufferCommand.GETN, [self._bufnum, i, bs], bundle=False ) data += list(tmp)[3:] # skip first 3 els [bufnum, startidx, size] i += bs data = np.array(data).reshape((-1, self._channels)) return data
# Section: Buffer information methods
[docs] def query(self) -> BufferInfo: """Get buffer info. Returns ------- Tuple: (buffer number, number of frames, number of channels, sampling rate) Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") return BufferInfo._make( self._server.msg(BufferCommand.QUERY, [self._bufnum], bundle=False)
)
[docs] def __repr__(self) -> str: if self.samples is None or self.sr is None: duration = 0 else: duration = self.samples / self.sr return ( f"<Buffer({self.bufnum}) on {self._server.addr}:" + f" {self.channels} x {self.samples} @ {self.sr} Hz = {duration:.3f}s" + f""" {["not loaded", "allocated"][self.allocated]}""" + f" using mode '{self._alloc_mode}'>"
) # Section: Methods to delete / free Buffers
[docs] def free(self) -> None: """Free buffer data. Raises ------ RuntimeError If the Buffer is not allocated yet. """ if not self._allocated: raise RuntimeError("Buffer object is not initialized!") if ( self._alloc_mode != BufferAllocationMode.EXISTING and not self._bufnum_set_manually ): self._server.buffer_ids.free([self._bufnum]) self._server.msg(BufferCommand.FREE, [self._bufnum], bundle=True) self._allocated = False self._alloc_mode = BufferAllocationMode.NONE
# Section: Properties @property def bufnum(self) -> Optional[int]: """Buffer number which serves as ID in SuperCollider Returns ------- int bufnum """ return self._bufnum @property def allocated(self) -> bool: """Whether this Buffer is allocated by any of the initialization methods. Returns ------- bool True if allocated """ return self._allocated @property def alloc_mode(self) -> BufferAllocationMode: """Mode of Buffer allocation. One of ['file', 'alloc', 'data', 'existing', 'copy'] according to previously used generator. Defaults to None if not allocated. Returns ------- str allocation mode """ return self._alloc_mode @property def path(self) -> Optional[Path]: """File path that was provided to read. Returns ------- pathlib.Path buffer file path """ return self._path @property def channels(self) -> Optional[int]: """Number of channels in the Buffer. Returns ------- int channel number """ return self._channels @property def samples(self) -> Optional[int]: """Number of samples in the buffer. Returns ------- int sample number """ return self._samples @property def sr(self) -> Optional[int]: """Sampling rate of the Buffer. Returns ------- int sampling rate """ return self._sr @property def duration(self) -> Optional[float]: """Duration of the Buffer in seconds. Returns ------- float duration in seconds """ if self._samples is not None and self._sr is not None: return self._samples / self._sr else: return None @property def server(self) -> "SCServer": """The server where this Buffer is placed. Returns ------- SCServer The server where this Buffer is placed. """ return self._server # Section: Private utils
[docs] def _gen_flags(self, a_normalize=False, a_wavetable=False, a_clear=False) -> int: """Generate Wave Fill Commands flags from booleans according to the SuperCollider Server Command Reference. Parameters ---------- a_normalize : bool, optional Normalize peak amplitude of wave to 1.0, by default False a_wavetable : bool, optional If set, then the buffer is written in wavetable format so that it can be read by interpolating oscillators, by default False a_clear : bool, optional If set then the buffer is cleared before new partials are written into it. Otherwise the new partials are summed with the existing contents of the buffer, by default False Returns ------- int Wave Fill Commands flags """ normalize = 1 if a_normalize is True else 0 wavetable = 2 if a_wavetable is True else 0 clear = 4 if a_clear is True else 0 return normalize + wavetable + clear