Recently, the number of distributors using 3D models such as VTuber is increasing. Many distributors use monochannel acoustic information for distribution, but in order to make the distribution more immersive, stereophonic sound is generated by using a microphone array such as 3Dio. When there are multiple sound sources, it is recommended to use such a microphone array. However, in a situation such as ASMR, when a single sound source (person) is made into stereophonic sound, stereophonic sound can be generated by signal processing. Of course, multiple sound sources are possible, but ...
What is important here is that the microphone part of the microphone array used for ASMR, such as 3Dio, which is a head-related transfer function, is in the shape of an ear. This is because it mimics the transmission of sound from the sound source to the eardrum of the ear. This transfer function is called the head related transfer function (HRTF). In this article, I will describe how to implement a system that generates stereophonic sound by signal processing using HRTFs up to the right and left ears in Python.
Note: HRTFs vary from person to person, and sound image position estimates before and after are often inconsistent. In fact, even when using 3Dio etc., it only imitates the standard ear shape and cannot solve individual differences.
Since the program is long, I will explain only the outline. For more information, please read the Git program below. Also, how to use it is described in git. Git: k-washi/stereophonic-Sound-System
Please refer to the following Qiita article as appropriate for logging, reading the configuration file, and gRPC communication for acquiring location information described in the program.
Please click the image below. You can listen to the 3D sound actually created by linking on Youtube. Since the microphone is attached to the mac, the original sound is bad, but you can see that it is stereophonic.
pip install PyAudio==0.2.11
pip install grpcio-tools
pip install numpy
Here, the HRTF database is read, converted to the frequency domain, and saved using pickle.
For the HRTF, we used the HRTF data (2) from Nagoya University HRTF.
The program is described below. The details are omitted as .... To see the full program, see acoustic / spacialSound.py.
acoustic/spacialSound.py
...
class HRTF():
def __init__(self):
...
def checkModel(self):
#Get model file name
...
self.getModelNameList()
...
def getModelNameList(self):
#Analyze HRTF file names for each azimuth and elevation
...
def openData(self, path, dataType = np.int16):
#H each direction,The number of samples of HRTF signal at elevation angle is 512 points, and this data is read and converted to numpy format.
with open(path, 'r') as rData:
temp = rData.read().split("\n")
data = []
for item in temp:
if item != '':
data.append(float(item))
return np.array(data)
def convHRTF2Np(self):
#Read HRTF data for each azimuth and elevation, and 512 points based on the overlap add method+0 fill(512 points)Is FFT.(FFT is described in the next chapter)
#The above FFT is performed on both ears, and all data is saved by pickle. The saveData function is used for saving.
#For reading, the saved pickle data can be read with readData.
for e, _ in enumerate(self.elev):
LaziData = []
RaziData = []
for a, _ in enumerate(self.azimuth[e]):
Lpath = self.Lpath[e][a]
Rpath = self.Rpath[e][a]
Ldata = spec.overlapAdderFFT(self.openData(Lpath))
Rdata = spec.overlapAdderFFT(self.openData(Rpath))
LaziData.append(Ldata)
RaziData.append(Rdata)
self.hrtf[self.left].append(LaziData)
self.hrtf[self.right].append(RaziData)
self.saveData(self.hrtf, Conf.HRTFpath)
self.saveData(self.elev, Conf.Elevpath)
self.saveData(self.azimuth, Conf.Azimuthpath)
def saveData(self, data, path):
#pickle data storage
try:
with open(path, 'wb') as hrtf:
pickle.dump(data, hrtf)
except Exception as e:
logger.critical(e)
def readData(self, path):
#pickle data reading
try:
with open(path, 'rb') as hrtf:
data = pickle.load(hrtf)
except Exception as e:
logger.critical(e)
return data
Here, the implementation related to FFT will be described.
For more information, see acoustic / acousticSignalProc.py
When processing acoustic information, a convolution integral of the HRTF and microphone input is required. However, when processed with a raw acoustic signal, the processing takes time and the data is difficult to handle. Therefore, a Fourier transform that converts the signal into information in the frequency domain is required. The FFT is the one that performs this Fourier transform at high speed.
The OverLap Add method (OLA) is often used when convolving HRTFs and mic inputs. For example, if the number of HRTF and microphone input samples is 512, the 512 points are filled with 0 in addition to each data. That is, 512 + 512 (0) data is created. After that, the positive frequency of FFT is calculated by the rfft function of numpy. In the case of numpy fft, 512/2 = 256 points of positive frequency components and 256 points of negative frequency components are calculated. However, rfft is used in many engineering applications because only positive frequency components are often sufficient. Also, although the calculation algorithms of numpy's rfft and fft are different, the error of the result is quite small, so rfft is used this time. Then, after executing the FFT, the HRTF and the microphone input are multiplied for each frequency. This combination will be explained in the next chapter.
In the following program, two FFTs, overlapAdderFFT and spacializeFFT, are prepared. What is the difference? Whether or not window (window function) is multiplied. Since the window function assumes the periodicity of the range cut out by the Fourier transform, a function that makes the end smaller is applied to the data so that the ends are connected. However, the HRTF has only 512 points per data, and the original data cannot be restored when the window function is applied, so it is used without applying the window function. On the other hand, the microphone input is applied with a window function. As will be explained in the next chapter, when the window function is applied, the end information is lost, so each data is used while shifting by 128 points.
acoustic/acousticSignalProc.py
import pyaudio
import numpy as np
class SpectrogramProcessing():
def __init__(self, freq = Conf.SamplingRate):
self.window = np.hamming(Conf.SysChunk)
self.overlapFreq = np.fft.rfftfreq(Conf.SysChunk * 2, d=1./freq)
self.overlapData = np.zeros((int(Conf.SysChunk * 2)), dtype = np.float32)
def overlapAdderFFT(self, data):
#Fill in 0 and FFT
self.overlapData[:Conf.SysChunk] = data
return np.fft.rfft(self.overlapData)
def spacializeFFT(self, data):
#Fill in 0 and apply hanning window.
self.overlapData[:Conf.SysChunk] = data * self.window
return np.fft.rfft(self.overlapData)
def ifft(self, data):
#in: chanel_num x freq num (if 1.6kHz, 0,...,7984.375 Hz)
#out: chanel_num x frame num(Conf.SysChunk = 512)
return np.fft.irfft(data)
Next, the microphone input, output, and conversion program to stereophonic sound will be described. Processing is performed using the functions and the like described above. Also, regarding settings and gRPC, please refer to my past articles as mentioned at the beginning.
acoustic/audioStreamOverlapAdder.py
...
from acoustic.acousticSignalProc import AudioDevice, SpectrogramProcessing, WaveProcessing, convNp2pa, convPa2np
from acoustic.spacialSound import spacialSound
# ------------
import pyaudio
import numpy as np
import time
class MicAudioStream():
def __init__(self):
self.pAudio = pyaudio.PyAudio()
self.micInfo = AudioDevice(Conf.MicID)
self.outInfo = AudioDevice(Conf.OutpuID)
#Output device restriction processing
if self.outInfo.micOutChannelNum < 2:
self.left = 0
self.right = 0
else:
self.left = 0
self.right = 1
if self.outInfo.micOutChannelNum > 2:
self.outInfo.micChannelNum = 2
logger.info("I limited it to 2 channels because the number of output microphones is excessive.")
self.startTime = time.time()
#Currently, only 16-bit bit width is supported.(Because we have not confirmed the operation in other cases)
if Conf.SysSampleWidth == 2:
self.format = pyaudio.paInt16
self.dtype = np.int16
else:
logger.critical("Currently not supported")
exec(-1)
self.fft = SpectrogramProcessing()
#If you create numpy array format data every time and allocate memory, it will take time, so create it in advance.
self.data = np.zeros((int(Conf.StreamChunk * 2)), dtype=self.dtype)
self.npData = np.zeros((int(Conf.StreamChunk * 2)) , dtype=self.dtype)
self.overlapNum = int(Conf.StreamChunk / Conf.SysFFToverlap)
self.freqData = np.zeros((self.overlapNum, self.outInfo.micOutChannelNum, self.fft.overlapFreq.shape[0]), dtype=np.complex)
self.convFreqData = np.zeros((self.outInfo.micOutChannelNum, int(Conf.StreamChunk*3)) , dtype=self.dtype)
self.outData = np.zeros((self.outInfo.micOutChannelNum * Conf.StreamChunk), dtype=self.dtype)
self.Aweight = self.fft.Aweight() #I applied the A characteristic, but it didn't change much, so I don't have to worry about it. (May be erased)
#Initial value of location information
self.x = 0.2
self.y = 10
self.z = 0.2
#HRTF reading (acoustic/spacialSound.py)
#You can execute processing that returns HRTF for location information
self.hrft = spacialSound()
#When recording stereophonic sound
if Conf.Record:
#test/listOrNumpy.Speed comparison with py
#As for the Array format of numpy, it is faster to convert numpy to list and extend the list than to combine the array format.
self.recordList = []
def spacialSoundConvering(self, freqData):
#Returns HRTFs for position
lhrtf, rhrtf = self.hrft.getHRTF(self.x, self.y, self.z)
#The input data of the HRTF microphone is convolved as shown below to generate stereophonic sound.
freqData[self.left] = freqData[self.left] * lhrtf
freqData[self.right] = freqData[self.right] * rhrtf
return freqData * self.Aweight
def callback(self, in_data, frame_count, time_info, status):
#A function that processes sound data in the stream processing of pyAudio.
#in_data is input, return is out_Sound data is output as data.
if time.time() - self.startTime > Conf.SysCutTime:
#Converting pyAudio format input to numpy format.
self.npData[Conf.StreamChunk:] = convPa2np(np.fromstring(in_data, self.dtype), channelNum=self.micInfo.micChannelNum)[0, :] #ch1 input
#Overlap width of data below(128)Generates stereophonic sound while shifting one by one.
for i in range(self.overlapNum):
#512 points(SysChunk)FFT is performed with the width of.
self.freqData[i, :, :] = self.fft.spacializeFFT(self.npData[Conf.SysFFToverlap * i : Conf.SysChunk + Conf.SysFFToverlap * i])
#The HRTF and mic input are folded.
self.freqData[i, :, :] = self.spacialSoundConvering(self.freqData[i])
#The frequency domain is converted to the time domain by the inverse Fourier transform.
self.convFreqData[:, Conf.SysFFToverlap * i : Conf.SysChunk * 2 + Conf.SysFFToverlap * i] += self.fft.ifft(self.freqData[i]).real.astype(self.dtype)#[:,:Conf.SysChunk]
#Converting from numpy format to output format with pyAudio.
self.outData[:] = convNp2pa(self.convFreqData[:,:Conf.StreamChunk])
#The distance attenuation of the sound is calculated. Also, the sound is too loud, so I divide it by SysAttenuation.
self.outData[:] = self.hrft.disanceAtenuation(self.outData[:], self.x, self.y, self.z) / Conf.SysAttenuation
if Conf.Record:
self.recordList += self.outData.tolist()
#Initialize for next mic input
self.npData[:Conf.StreamChunk] = self.npData[Conf.StreamChunk:]
self.convFreqData[:, :Conf.StreamChunk*2] = self.convFreqData[:, Conf.StreamChunk:]
self.convFreqData[:,Conf.StreamChunk*2:] = 0
#Convert to pyAudio format data output format
out_data = self.outData.tostring()
return (out_data, pyaudio.paContinue)
def start(self):
#Set the input / output device and format in the following format and start processing.
"""
rate – Sampling rate
channels – Number of channels
format – Sampling size and format. See PortAudio Sample Format.
input – Specifies whether this is an input stream. Defaults to False.
output – Specifies whether this is an output stream. Defaults to False.
input_device_index – Index of Input Device to use. Unspecified (or None) uses default device. Ignored if input is False.
output_device_index – Index of Output Device to use. Unspecified (or None) uses the default device. Ignored if output is False.
frames_per_buffer – Specifies the number of frames per buffer.
start – Start the stream running immediately. Defaults to True. In general, there is no reason to set this to False.
input_host_api_specific_stream_info – Specifies a host API specific stream information data structure for input.
output_host_api_specific_stream_info – Specifies a host API specific stream information data structure for output.
stream_callback –Specifies a callback function for non-blocking (callback) operation. Default is None, which indicates blocking operation (i.e., Stream.read() and Stream.write()). To use non-blocking operation, specify a callback that conforms to the following signature:
callback(in_data, # recorded data if input=True; else None
frame_count, # number of frames
time_info, # dictionary
status_flags) # PaCallbackFlags
time_info is a dictionary with the following keys: input_buffer_adc_time, current_time, and output_buffer_dac_time; see the PortAudio documentation for their meanings. status_flags is one of PortAutio Callback Flag.
The callback must return a tuple:
(out_data, flag)
out_data is a byte array whose length should be the (frame_count * channels * bytes-per-channel) if output=True or None if output=False. flag must be either paContinue, paComplete or paAbort (one of PortAudio Callback Return Code). When output=True and out_data does not contain at least frame_count frames, paComplete is assumed for flag.
"""
self.stream = self.pAudio.open(
format = self.format,
rate = Conf.SamplingRate,#self.micInfo.samplingRate,
channels = self.micInfo.micChannelNum,
input = True,
output = True,
input_device_index = Conf.MicID,
output_device_index = Conf.OutpuID,
stream_callback = self.callback,
frames_per_buffer = Conf.StreamChunk
)
self.stream.start_stream()
def stop(self):
#Necessary processing is executed separately from sound processing. Finally, the closing process is also executed when the system is shut down.
#Here, gRPC is used to update the sound source position information.
from proto.client import posClient
grpcPosGetter = posClient()
grpcPosGetter.open()
while self.stream.is_active():
time.sleep(0.1)
try:
ok = grpcPosGetter.posRequest()
if ok:
self.x, self.y, self.z = grpcPosGetter.getPos()
except Exception as e:
logger.error("pos getter error {0}".format(e))
if time.time() - self.startTime > Conf.RecordTime + Conf.SysCutTime:
break
if Conf.Record:
record = WaveProcessing()
record.SaveFlatteData(self.recordList, channelNum=self.outInfo.micOutChannelNum)
self.stream.start_stream()
self.stream.close()
self.close()
grpcPosGetter.close()
def close(self):
self.pAudio.terminate()
logger.debug("Close proc")
exit(0)
if __name__ == "__main__":
st = MicAudioStream()
st.start()
try:
pass
finally:
st.stop()
In order to actually try the sound processing, it is necessary to confirm the ID of the input / output device.
Device information example
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO Index: 0 | Name: Built-in Microphone | ChannelNum: in 2 out 0 | SampleRate: 44100.0
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO Index: 1 | Name: Built-in Output | ChannelNum: in 0 out 2 | SampleRate: 44100.0
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO Index: 2 | Name: DisplayPort | ChannelNum: in 0 out 2 | SampleRate: 48000.0
...
The following shows a program that outputs input / output device information using PyAudio.
acoustic/acousticSignalProc.py
...
import pyaudio
import numpy as np
class AudioDevice():
def __init__(self, devId = Conf.MicID):
self.pAudio = pyaudio.PyAudio()
self.setAudioDeviceInfo(devId)
self.samplingRate = Conf.SamplingRate
def getAudioDeviceInfo(self):
#Output device information using PyAudio.
for i in range(self.pAudio.get_device_count()):
tempDic = self.pAudio.get_device_info_by_index(i)
text = 'Index: {0} | Name: {1} | ChannelNum: in {2} out {3} | SampleRate: {4}'.format(tempDic['index'], tempDic['name'], tempDic['maxInputChannels'], tempDic['maxOutputChannels'], tempDic['defaultSampleRate'])
logger.info(text)
def setAudioDeviceInfo(self, micId = 0):
#Check if the set device ID exists and retain the information of that ID
micInfoDic = {}
for i in range(self.pAudio.get_device_count()):
micInfoDic = self.pAudio.get_device_info_by_index(i)
if micInfoDic['index'] == micId:
self.micName = micInfoDic['name']
self.micChannelNum = micInfoDic['maxInputChannels']
self.micOutChannelNum = micInfoDic['maxOutputChannels']
self.micSamplingRate = int(micInfoDic['defaultSampleRate'])
text = 'Set Audio Device Info || Index: {0} | Name: {1} | ChannelNum: {2}, {3} | SampleRate: {4}'.format(micId, self.micName, self.micChannelNum,self.micOutChannelNum, self.micSamplingRate)
logger.info(text)
if self.micChannelNum > 2:
logger.critical("It does not support 3 or more microphone inputs.")
exit(-1)
break
if self.pAudio.get_device_count() == i + 1:
logger.critical("There is no microphone with the corresponding id.")
The above is the explanation about the stereophonic system using Python. It may be complicated because some parts have been omitted and other articles need to be referred to, but I hope it will be helpful. It is said that python is slow, but if you use numpy efficiently by allocating memory in advance, it can often be executed at a sufficient speed. I have written various other articles, so please refer to them.
Recommended Posts