:mod:`sc3nb.osc.osc_communication` ================================== .. py:module:: sc3nb.osc.osc_communication .. autoapi-nested-parse:: OSC communication Classes and functions to communicate with SuperCollider using the Open Sound Control (OSC) protocol over UDP .. !! processed by numpydoc !! Module Contents --------------- Function List ~~~~~~~~~~~~~ .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.get_max_udp_packet_size sc3nb.osc.osc_communication.split_into_max_size sc3nb.osc.osc_communication.convert_to_sc3nb_osc Class List ~~~~~~~~~~ .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.OSCMessage sc3nb.osc.osc_communication.Bundler sc3nb.osc.osc_communication.MessageHandler sc3nb.osc.osc_communication.MessageQueue sc3nb.osc.osc_communication.MessageQueueCollection sc3nb.osc.osc_communication.OSCCommunication Content ~~~~~~~ .. py:data:: _LOGGER .. py:function:: get_max_udp_packet_size() Get the max UDP packet size by trial and error .. !! processed by numpydoc !! .. py:function:: split_into_max_size(bundler: Bundler, max_dgram_size) -> List[pythonosc.osc_bundle.OscBundle] .. class:: OSCMessage(msg_address: str, msg_parameters: Optional[Union[Sequence, Any]] = None) Class for creating messages to send over OSC :Parameters: **msg_address** : str OSC message address **msg_parameters** : Optional[Union[Sequence]], optional OSC message parameters, by default None .. !! processed by numpydoc !! .. py:attribute:: _content :type: pythonosc.osc_message.OscMessage **Overview:** .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.OSCMessage.to_pythonosc sc3nb.osc.osc_communication.OSCMessage._build_message sc3nb.osc.osc_communication.OSCMessage.__repr__ .. py:method:: to_pythonosc() -> pythonosc.osc_message.OscMessage Return python-osc OscMessage .. !! processed by numpydoc !! .. py:method:: _build_message(msg_address: str, msg_parameters: Optional[Union[Sequence, Any]] = None) -> pythonosc.osc_message.OscMessage :staticmethod: Builds pythonsosc OSC message. :Parameters: **msg_address** : str SuperCollider address. **msg_parameters** : list, optional List of parameters to add to message. :Returns: OscMessage Message ready to be sent. .. !! processed by numpydoc !! .. py:method:: __repr__() -> str .. class:: Bundler(timetag: float = 0, msg: Optional[Union[OSCMessage, str]] = None, msg_params: Optional[Sequence[Any]] = None, *, server: Optional[OSCCommunication] = None, receiver: Optional[Union[str, Tuple[str, int]]] = None, send_on_exit: bool = True) Class for creating OSCBundles and bundling of messages Create a Bundler :Parameters: **timetag** : float, optional Starting time at which bundle content should be executed. If timetag > 1e6 it is interpreted as POSIX time. If timetag <= 1e6 it is assumed to be relative value in seconds and is added to time.time(), by default 0, i.e. 'now'. **msg** : OSCMessage or str, optional OSCMessage or message address, by default None **msg_params** : sequence of any type, optional Parameters for the message, by default None **server** : OSCCommunication, optional OSC server, by default None **receiver** : Union[str, Tuple[str, int]], optional Where to send the bundle, by default send to default receiver of server **send_on_exit** : bool, optional Whether the bundle is sent when using as context manager, by default True .. !! processed by numpydoc !! .. py:attribute:: timetag .. py:attribute:: default_receiver .. py:attribute:: contents :type: List[Union[Bundler, OSCMessage]] :value: [] .. py:attribute:: passed_time :value: 0.0 .. py:attribute:: send_on_exit **Overview:** .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.Bundler.wait sc3nb.osc.osc_communication.Bundler.add sc3nb.osc.osc_communication.Bundler.messages sc3nb.osc.osc_communication.Bundler.send sc3nb.osc.osc_communication.Bundler.to_raw_osc sc3nb.osc.osc_communication.Bundler.to_pythonosc sc3nb.osc.osc_communication.Bundler._calc_timetag sc3nb.osc.osc_communication.Bundler.__deepcopy__ sc3nb.osc.osc_communication.Bundler.__enter__ sc3nb.osc.osc_communication.Bundler.__exit__ sc3nb.osc.osc_communication.Bundler.__repr__ .. py:method:: wait(time_passed: float) -> None Add time to internal time :Parameters: **time_passed** : float How much seconds should be passed. .. !! processed by numpydoc !! .. py:method:: add(*args) -> Bundler Add content to this Bundler. :Parameters: **args** : accepts an OSCMessage or Bundler or a timetag with an OSCMessage or Bundler or Bundler arguments like (timetag, msg_addr, msg_params) (timetag, msg_addr) (timetag, msg) :Returns: Bundler self for chaining .. !! processed by numpydoc !! .. py:method:: messages(start_time: Optional[float] = 0.0, delay: Optional[float] = None) -> Dict[float, List[OSCMessage]] Generate a dict with all messages in this Bundler. They dict key is the time tag of the messages. :Parameters: **start_time** : Optional[float], optional start time when using relative timing, by default 0.0 :Returns: Dict[float, List[OSCMessage]] dict containg all OSCMessages .. !! processed by numpydoc !! .. py:method:: send(server: Optional[OSCCommunication] = None, receiver: Tuple[str, int] = None, bundle: bool = True) Send this Bundler. :Parameters: **server** : OSCCommunication, optional Server instance for sending the bundle. If None it will use the server from init or try to use sc3nb.SC.get_default().server, by default None **receiver** : Tuple[str, int], optional Address (ip, port) to send to, if None it will send the bundle to the default receiver of the Bundler **bundle** : bool, optional If True this is allowed to be bundled, by default True :Raises: RuntimeError When no server could be found. .. !! processed by numpydoc !! .. py:method:: to_raw_osc(start_time: Optional[float] = None, delay: Optional[float] = None) -> bytes Create a raw OSC Bundle from this bundler. :Parameters: **start_time** : Optional[float], optional used as start time when using relative timing, by default time.time() **delay: float, optinal** used to delay the timing. :Returns: OscBundle bundle instance for sending .. !! processed by numpydoc !! .. py:method:: to_pythonosc(start_time: Optional[float] = None, delay: Optional[float] = None) -> pythonosc.osc_bundle.OscBundle Build this bundle. :Parameters: **start_time** : Optional[float], optional used as start time when using relative timing, by default time.time() **delay: float, optinal** used to delay the timing. :Returns: OscBundle bundle instance for sending .. !! processed by numpydoc !! .. py:method:: _calc_timetag(start_time: Optional[float]) .. py:method:: __deepcopy__(memo) -> Bundler .. py:method:: __enter__() .. py:method:: __exit__(exc_type, exc_value, exc_traceback) .. py:method:: __repr__() -> str .. py:function:: convert_to_sc3nb_osc(data: Union[OSCMessage, Bundler, pythonosc.osc_message.OscMessage, pythonosc.osc_bundle.OscBundle, bytes]) -> Union[OSCMessage, Bundler] Get binary OSC representation :Parameters: **package** : Union[OscMessage, Bundler, OscBundle] OSC Package object :Returns: bytes raw OSC binary representation of OSC Package :Raises: ValueError If package is not supported .. !! processed by numpydoc !! .. class:: MessageHandler **Bases:** :class:`abc.ABC` Base class for Message Handling .. !! processed by numpydoc !! **Overview:** .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.MessageHandler.put .. py:method:: put(address: str, *args) -> None :abstractmethod: Add message to MessageHandler :Parameters: **address** : str Message address .. !! processed by numpydoc !! .. class:: MessageQueue(address: str, preprocess: Optional[Callable] = None) **Bases:** :class:`MessageHandler` Queue to retrieve OSC messages send to the corresponding OSC address Create a new AddressQueue :Parameters: **address** : str OSC address for this queue **preprocess** : function, optional function that will be applied to the value before they are enqueued (Default value = None) .. !! processed by numpydoc !! .. py:attribute:: _address .. py:attribute:: process .. py:attribute:: _queue .. py:attribute:: _skips :value: 0 **Overview:** .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.MessageQueue.put sc3nb.osc.osc_communication.MessageQueue.skipped sc3nb.osc.osc_communication.MessageQueue.get sc3nb.osc.osc_communication.MessageQueue.show sc3nb.osc.osc_communication.MessageQueue._repr_pretty_ .. py:method:: put(address: str, *args) -> None Add a message to MessageQueue :Parameters: **address** : str message address .. !! processed by numpydoc !! .. py:method:: skipped() Skipp one queue value .. !! processed by numpydoc !! .. py:method:: get(timeout: float = 5, skip: bool = True) -> Any Returns a value from the queue :Parameters: **timeout** : int, optional Time in seconds that will be waited on the queue, by default 5 **skip** : bool, optional If True the queue will skip as many values as `skips`, by default True :Returns: obj value from queue :Raises: Empty If the queue has no value .. !! processed by numpydoc !! .. py:method:: show() -> None Print the content of the queue. .. !! processed by numpydoc !! .. py:method:: _repr_pretty_(printer, cycle) -> None .. class:: MessageQueueCollection(address: str, sub_addrs: Optional[Sequence[str]] = None) **Bases:** :class:`MessageHandler` A collection of MessageQueues that are all sent to one and the same first address. Create a collection of MessageQueues under the same first address :Parameters: **address** : str first message address that is the same for all MessageQueues **sub_addrs** : Optional[Sequence[str]], optional secound message addresses with seperate queues, by default None Additional MessageQueues will be created on demand. .. !! processed by numpydoc !! .. py:attribute:: _address **Overview:** .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.MessageQueueCollection.put sc3nb.osc.osc_communication.MessageQueueCollection.__contains__ sc3nb.osc.osc_communication.MessageQueueCollection.__getitem__ .. py:method:: put(address: str, *args) -> None Add a message to the corresponding MessageQueue :Parameters: **address** : str first message address .. !! processed by numpydoc !! .. py:method:: __contains__(item) -> bool .. py:method:: __getitem__(key) .. exception:: OSCCommunicationError(message, send_message) **Bases:** :class:`Exception` Exception for OSCCommunication errors. Initialize self. See help(type(self)) for accurate signature. .. !! processed by numpydoc !! .. py:attribute:: message .. py:attribute:: send_message .. class:: OSCCommunication(server_ip: str, server_port: int, default_receiver_ip: str, default_receiver_port: int) Class to send and receive OSC messages and bundles. Create an OSC communication server :Parameters: **server_ip** : str IP address to use for this server **server_port** : int port to use for this server **default_receiver_ip** : str IP address used for sending by default **default_receiver_port** : int port used for sending by default .. !! processed by numpydoc !! .. py:attribute:: _receivers :type: Dict[Tuple[str, int], str] .. py:attribute:: _default_receiver :type: Tuple[str, int] .. py:attribute:: _bundling_lock .. py:attribute:: _bundling_bundles :value: [] .. py:attribute:: _osc_server_thread .. py:attribute:: _max_udp_packet_size :value: None .. py:attribute:: _msg_queues :type: Dict[str, MessageQueue] .. py:attribute:: _reply_addresses :type: Dict[str, str] **Overview:** .. autoapisummary:: :nosignatures: sc3nb.osc.osc_communication.OSCCommunication.add_msg_pairs sc3nb.osc.osc_communication.OSCCommunication.add_msg_queue sc3nb.osc.osc_communication.OSCCommunication.add_msg_queue_collection sc3nb.osc.osc_communication.OSCCommunication._check_sender sc3nb.osc.osc_communication.OSCCommunication.lookup_receiver sc3nb.osc.osc_communication.OSCCommunication.connection_info sc3nb.osc.osc_communication.OSCCommunication.add_receiver sc3nb.osc.osc_communication.OSCCommunication.send sc3nb.osc.osc_communication.OSCCommunication._handle_outgoing_message sc3nb.osc.osc_communication.OSCCommunication.get_reply_address sc3nb.osc.osc_communication.OSCCommunication.msg sc3nb.osc.osc_communication.OSCCommunication.bundler sc3nb.osc.osc_communication.OSCCommunication.quit .. py:method:: add_msg_pairs(msg_pairs: Dict[str, str]) -> None Add the provided pairs for message receiving. :Parameters: **msg_pairs** : dict[str, str], optional dict containing user specified message pairs. {msg_addr: reply_addr} .. !! processed by numpydoc !! .. py:method:: add_msg_queue(msg_queue: MessageQueue, out_addr: Optional[str] = None) -> None Add a MessageQueue to this servers dispatcher :Parameters: **msg_queue** : MessageQueue new MessageQueue **out_addr** : Optional[str], optional The outgoing message address that belongs to this MessageQeue, by default None .. !! processed by numpydoc !! .. py:method:: add_msg_queue_collection(msg_queue_collection: MessageQueueCollection) -> None Add a MessageQueueCollection :Parameters: **msg_queue_collection** : MessageQueueCollection MessageQueueCollection to be added .. !! processed by numpydoc !! .. py:method:: _check_sender(sender: Tuple[str, int]) -> Union[str, Tuple[str, int]] .. py:method:: lookup_receiver(receiver: Union[str, Tuple[str, int]]) -> Tuple[str, int] Reverse lookup the address of a specific receiver :Parameters: **receiver** : str Receiver name. :Returns: Tuple[str, int] Receiver address (ip, port) :Raises: KeyError If receiver is unknown. ValueError If the type of the receiver argument is wrong. .. !! processed by numpydoc !! .. py:method:: connection_info(print_info: bool = True) -> Tuple[Tuple[str, int], Dict[Tuple[str, int], str]] Get information about the known addresses :Parameters: **print_info** : bool, optional If True print connection information (Default value = True) :Returns: tuple containing the address of this sc3nb OSC Server and known receivers addresses in a dict with their names as values .. !! processed by numpydoc !! .. py:method:: add_receiver(name: str, ip_address: str, port: int) Adds a receiver with the specified address. :Parameters: **name** : str Name of receiver. **ip_address** : str IP address of receiver (e.g. "127.0.0.1") **port** : int Port of the receiver .. !! processed by numpydoc !! .. py:method:: send(package: Union[OSCMessage, Bundler], *, receiver: Optional[Union[str, Tuple[str, int]]] = None, bundle: bool = False, await_reply: bool = True, timeout: float = 5) -> Any Sends OSC packet :Parameters: **package** : OSCMessage or Bundler Object with `dgram` attribute. **receiver** : str or Tuple[str, int], optional Where to send the packet, by default send to default receiver **bundle** : bool, optional If True it is allowed to bundle the package with bundling, by default False. **await_reply** : bool, optional If True ask for reply from the server and return it, otherwise send the message and return None directly, by default True. If the package is bundled None will be returned. **timeout** : int, optional timeout in seconds for reply, by default 5 :Returns: None or reply None if no reply was received or awaited else reply. :Raises: ValueError When the provided package is not supported. OSCCommunicationError When the handling of a package fails. .. !! processed by numpydoc !! .. py:method:: _handle_outgoing_message(message: OSCMessage, receiver_address: Tuple[str, int], await_reply: bool, timeout: float) -> Any .. py:method:: get_reply_address(msg_address: str) -> Optional[str] Get the corresponding reply address for the given address :Parameters: **msg_address** : str outgoing message address :Returns: str or None Corresponding reply address if available .. !! processed by numpydoc !! .. py:method:: msg(msg_addr: str, msg_params: Optional[Sequence] = None, *, bundle: bool = False, receiver: Optional[Tuple[str, int]] = None, await_reply: bool = True, timeout: float = 5) -> Optional[Any] Creates and sends OSC message over UDP. :Parameters: **msg_addr** : str SuperCollider address of the OSC message **msg_params** : Optional[Sequence], optional List of paramters of the OSC message, by default None **bundle** : bool, optional If True it is allowed to bundle the content with bundling, by default False **receiver** : tuple[str, int], optional (IP address, port) to send the message, by default send to default receiver **await_reply** : bool, optional If True send message and wait for reply otherwise send the message and return directly, by default True **timeout** : float, optional timeout in seconds for reply, by default 5 :Returns: obj reply if await_reply and there is a reply for this .. !! processed by numpydoc !! .. py:method:: bundler(timetag: float = 0, msg: Optional[Union[OSCMessage, str]] = None, msg_params: Optional[Sequence[Any]] = None, send_on_exit: bool = True) -> Bundler Generate a Bundler. This allows the user to easly add messages/bundles and send it. :Parameters: **timetag** : int Time at which bundle content should be executed. If timetag <= 1e6 it is added to time.time(). **msg** : OSCMessage or str, optional OSCMessage or message address, by default None **msg_params** : sequence of any type, optional Parameters for the message, by default None **send_on_exit** : bool, optional Whether the bundle is sent when using as context manager, by default True :Returns: Bundler bundler for OSC bundling. .. !! processed by numpydoc !! .. py:method:: quit() -> None Shuts down the sc3nb OSC server .. !! processed by numpydoc !!