:mod:`sc3nb.timed_queue` ======================== .. py:module:: sc3nb.timed_queue .. autoapi-nested-parse:: Classes to run register functions at certain timepoints and run asynchronously .. !! processed by numpydoc !! Module Contents --------------- Class List ~~~~~~~~~~ .. autoapisummary:: :nosignatures: sc3nb.timed_queue.Event sc3nb.timed_queue.TimedQueue sc3nb.timed_queue.TimedQueueSC Content ~~~~~~~ .. class:: Event(timestamp: float, function: Callable[Ellipsis, None], args: Iterable[Any], spawn: bool = False) Stores a timestamp, function and arguments for that function. Long running functions can be wrapped inside an own thread :Parameters: **timestamp** : float Time event should be executed **function** : Callable[..., None] Function to be executed **args** : Iterable[Any] Arguments for function **spawn** : bool, optional if True, create new thread for function, by default False .. !! processed by numpydoc !! .. py:attribute:: timestamp .. py:attribute:: function .. py:attribute:: args **Overview:** .. autoapisummary:: :nosignatures: sc3nb.timed_queue.Event.execute sc3nb.timed_queue.Event.__eq__ sc3nb.timed_queue.Event.__lt__ sc3nb.timed_queue.Event.__le__ sc3nb.timed_queue.Event.__repr__ .. py:method:: execute() -> None Executes function .. !! processed by numpydoc !! .. py:method:: __eq__(other) .. py:method:: __lt__(other) .. py:method:: __le__(other) .. py:method:: __repr__() .. class:: TimedQueue(relative_time: bool = False, thread_sleep_time: float = 0.001, drop_time_threshold: float = 0.5) Accumulates events as timestamps and functions. Executes given functions according to the timestamps :Parameters: **relative_time** : bool, optional If True, use relative time, by default False **thread_sleep_time** : float, optional Sleep time in seconds for worker thread, by default 0.001 **drop_time_threshold** : float, optional Threshold for execution time of events in seconds. If this is exceeded the event will be dropped, by default 0.5 .. !! processed by numpydoc !! .. py:attribute:: drop_time_thr .. py:attribute:: start .. py:attribute:: onset_idx .. py:attribute:: event_list :value: [] .. py:attribute:: close_event .. py:attribute:: lock .. py:attribute:: thread **Overview:** .. autoapisummary:: :nosignatures: sc3nb.timed_queue.TimedQueue.close sc3nb.timed_queue.TimedQueue.join sc3nb.timed_queue.TimedQueue.complete sc3nb.timed_queue.TimedQueue.put sc3nb.timed_queue.TimedQueue.get sc3nb.timed_queue.TimedQueue.peek sc3nb.timed_queue.TimedQueue.empty sc3nb.timed_queue.TimedQueue.pop sc3nb.timed_queue.TimedQueue.__worker sc3nb.timed_queue.TimedQueue.__repr__ sc3nb.timed_queue.TimedQueue.elapse .. py:method:: close() -> None Closes event processing without waiting for pending events .. !! processed by numpydoc !! .. py:method:: join() -> None Closes event processing after waiting for pending events .. !! processed by numpydoc !! .. py:method:: complete() -> None Blocks until all pending events have completed .. !! processed by numpydoc !! .. py:method:: put(timestamp: float, function: Callable[Ellipsis, None], args: Iterable[Any] = (), spawn: bool = False) -> None Adds event to queue :Parameters: **timestamp** : float Time (POSIX) when event should be executed **function** : Callable[..., None] Function to be executed **args** : Iterable[Any], optional Arguments to be passed to function, by default () **spawn** : bool, optional if True, create new sub-thread for function, by default False :Raises: TypeError raised if function is not callable .. !! processed by numpydoc !! .. py:method:: get() -> Event Get latest event from queue and remove event :Returns: Event Latest event .. !! processed by numpydoc !! .. py:method:: peek() -> Event Look up latest event from queue :Returns: Event Latest event .. !! processed by numpydoc !! .. py:method:: empty() -> bool Checks if queue is empty :Returns: bool True if queue if empty .. !! processed by numpydoc !! .. py:method:: pop() -> None Removes latest event from queue .. !! processed by numpydoc !! .. py:method:: __worker(sleep_time: float, close_event: threading.Event) -> NoReturn Worker function to process events .. !! processed by numpydoc !! .. py:method:: __repr__() .. py:method:: elapse(time_delta: float) -> None Add time delta to the current queue time. :Parameters: **time_delta** : float Additional time .. !! processed by numpydoc !! .. class:: TimedQueueSC(server: sc3nb.osc.osc_communication.OSCCommunication = None, relative_time: bool = False, thread_sleep_time: float = 0.001) **Bases:** :class:`TimedQueue` Timed queue with OSC communication. :Parameters: **server** : OSCCommunication, optional OSC server to handle the bundlers and messsages, by default None **relative_time** : bool, optional If True, use relative time, by default False **thread_sleep_time** : float, optional Sleep time in seconds for worker thread, by default 0.001 .. !! processed by numpydoc !! .. py:attribute:: server **Overview:** .. autoapisummary:: :nosignatures: sc3nb.timed_queue.TimedQueueSC.put_bundler sc3nb.timed_queue.TimedQueueSC.put_msg .. py:method:: put_bundler(onset: float, bundler: sc3nb.osc.osc_communication.Bundler) -> None Add a Bundler to queue :Parameters: **onset** : float Sending timetag of the Bundler **bundler** : Bundler Bundler that will be sent .. !! processed by numpydoc !! .. py:method:: put_msg(onset: float, msg: Union[sc3nb.osc.osc_communication.OSCMessage, str], msg_params: Iterable[Any]) -> None Add a message to queue :Parameters: **onset** : float Sending timetag of the message **msg** : Union[OSCMessage, str] OSCMessage or OSC address **msg_params** : Iterable[Any] If msg is str, this will be the parameters of the created OSCMessage .. !! processed by numpydoc !!