-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudio.py
More file actions
126 lines (97 loc) · 3.82 KB
/
audio.py
File metadata and controls
126 lines (97 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import sounddevice as sd
import numpy as np
#import threading as mp
import billiard as mp
mp.forking_enable(False)
import queue
import shepard as shepard
import tones
default_device = sd.query_hostapis(0)['default_output_device']
devices = sd.query_devices()
device_names = []
device_indices = []
devices_defaultsamplerate = []
for i in range(len(devices)):
d = devices[i]
if d['max_output_channels'] > 1:
device_names.append(d['name'])
device_indices.append(i)
devices_defaultsamplerate.append(d['default_samplerate'])
default_samplerate = devices_defaultsamplerate[device_indices.index(default_device)]
blocksize = 2048
class Buffer(object):
def __init__(self, param_queue, sample_rate):
self.sample_rate = sample_rate
self.param_queue = param_queue
self.buffer_queue = mp.Queue()
self.shepard = shepard.ShepardTone(tones.freqs[0]*2**5, 1, 100, 0, 0.5,20.0,100000.0)
self.currentframe = 0
self.buffer_process = mp.Process(target=self._generate_buffer)
self.buffer_process.start()
def _generate_buffer(self):
while True:
while self.buffer_queue.qsize() < 5:
t = np.linspace(self.currentframe, self.currentframe + blocksize, blocksize, dtype=np.float32)
t /= self.sample_rate
self.buffer_queue.put(self.shepard.get_waveform(t))
self.currentframe += blocksize
try:
starting_freq, n, envelope_width, envelope_x0, volume, low_cutoff, high_cutoff = self.param_queue.get_nowait()
self.shepard.set(starting_freq, n, envelope_width, envelope_x0, volume, low_cutoff, high_cutoff)
except queue.Empty:
pass
def __del__(self):
self.buffer_process.terminate()
class Audio(object):
def __init__(self,buffer_queue,sample_rate=None,device_index=None):
self.buffer_queue = buffer_queue
if device_index is None:
device_index = default_device
if sample_rate is None:
sample_rate = default_samplerate
self.sample_rate = sample_rate
self.device_index = device_index
self.stream = sd.OutputStream(channels=2,device=self.device_index, samplerate=self.sample_rate, blocksize=blocksize, dtype='float32', latency=0.1,
callback=self._callback)
# try:
# name, value = self.input_queue.get_nowait()
# #starting_freq, n, envelope_width, envelope_x0, volume, low_cutoff, high_cutoff
#
# if name == 'starting_freq':
# self.shepard.starting_freq = value
# elif name == 'n':
# self.shepard.n = value
# elif name == 'envelope_width':
# self.shepard.envelope_width = value
# elif name == 'envelope_x0':
# self.shepard.envelope_x0 = value
# elif name == 'volume':
# self.shepard.volume = value
# elif name == 'low_cutoff':
# self.shepard.low_cutoff = value
# elif name == 'high_cutoff':
# self.shepard.high_cutoff = value
# except queue.Empty:
# pass
def _callback(self,outdata, frames, time, status):
if status:
print(status)
y = self.buffer_queue.get()
outdata[:, 0] = y
outdata[:, 1] = y
def on(self):
self.stream.start()
def off(self):
self.stream.stop()
for i in range(10):
try:
self.buffer_queue.get_nowait()
except queue.Empty:
break
self.currentframe = 0
#self.buffer_process = None
#print(self.stream.time)
def __del__(self):
self.stream = None
#self.buf.buffer_process.terminate()
sd.stop()