sc3nb.timed_queue

Classes to run register functions at certain timepoints and run asynchronously

Module Contents

Class List

Event

Stores a timestamp, function and arguments for that function.

TimedQueue

Accumulates events as timestamps and functions.

TimedQueueSC

Timed queue with OSC communication.

Content

class sc3nb.timed_queue.Event(timestamp: float, function: Callable[Ellipsis, None], args: Iterable[Any], spawn: bool = False)[source]

Stores a timestamp, function and arguments for that function. Long running functions can be wrapped inside an own thread

Parameters:
timestampfloat

Time event should be executed

functionCallable[…, None]

Function to be executed

argsIterable[Any]

Arguments for function

spawnbool, optional

if True, create new thread for function, by default False

Overview:

execute

Executes function

__eq__

Return self==value.

__lt__

Return self<value.

__le__

Return self<=value.

__repr__

Return repr(self).

execute() None[source]

Executes function

__eq__(other)[source]

Return self==value.

__lt__(other)[source]

Return self<value.

__le__(other)[source]

Return self<=value.

__repr__()[source]

Return repr(self).

class sc3nb.timed_queue.TimedQueue(relative_time: bool = False, thread_sleep_time: float = 0.001, drop_time_threshold: float = 0.5)[source]

Accumulates events as timestamps and functions.

Executes given functions according to the timestamps

Parameters:
relative_timebool, optional

If True, use relative time, by default False

thread_sleep_timefloat, optional

Sleep time in seconds for worker thread, by default 0.001

drop_time_thresholdfloat, optional

Threshold for execution time of events in seconds. If this is exceeded the event will be dropped, by default 0.5

Overview:

close

Closes event processing without waiting for pending events

join

Closes event processing after waiting for pending events

complete

Blocks until all pending events have completed

put

Adds event to queue

get

Get latest event from queue and remove event

peek

Look up latest event from queue

empty

Checks if queue is empty

pop

Removes latest event from queue

__worker

Worker function to process events

__repr__

Return repr(self).

elapse

Add time delta to the current queue time.

close() None[source]

Closes event processing without waiting for pending events

join() None[source]

Closes event processing after waiting for pending events

complete() None[source]

Blocks until all pending events have completed

put(timestamp: float, function: Callable[Ellipsis, None], args: Iterable[Any] = (), spawn: bool = False) None[source]

Adds event to queue

Parameters:
timestampfloat

Time (POSIX) when event should be executed

functionCallable[…, None]

Function to be executed

argsIterable[Any], optional

Arguments to be passed to function, by default ()

spawnbool, optional

if True, create new sub-thread for function, by default False

Raises:
TypeError

raised if function is not callable

get() Event[source]

Get latest event from queue and remove event

Returns:
Event

Latest event

peek() Event[source]

Look up latest event from queue

Returns:
Event

Latest event

empty() bool[source]

Checks if queue is empty

Returns:
bool

True if queue if empty

pop() None[source]

Removes latest event from queue

__worker(sleep_time: float, close_event: threading.Event) NoReturn[source]

Worker function to process events

__repr__()[source]

Return repr(self).

elapse(time_delta: float) None[source]

Add time delta to the current queue time.

Parameters:
time_deltafloat

Additional time

class sc3nb.timed_queue.TimedQueueSC(server: sc3nb.osc.osc_communication.OSCCommunication = None, relative_time: bool = False, thread_sleep_time: float = 0.001)[source]

Bases: TimedQueue

Timed queue with OSC communication.

Parameters:
serverOSCCommunication, optional

OSC server to handle the bundlers and messsages, by default None

relative_timebool, optional

If True, use relative time, by default False

thread_sleep_timefloat, optional

Sleep time in seconds for worker thread, by default 0.001

Overview:

put_bundler

Add a Bundler to queue

put_msg

Add a message to queue

put_bundler(onset: float, bundler: sc3nb.osc.osc_communication.Bundler) None[source]

Add a Bundler to queue

Parameters:
onsetfloat

Sending timetag of the Bundler

bundlerBundler

Bundler that will be sent

put_msg(onset: float, msg: Union[sc3nb.osc.osc_communication.OSCMessage, str], msg_params: Iterable[Any]) None[source]

Add a message to queue

Parameters:
onsetfloat

Sending timetag of the message

msgUnion[OSCMessage, str]

OSCMessage or OSC address

msg_paramsIterable[Any]

If msg is str, this will be the parameters of the created OSCMessage