Acoustic signal processing starting with Python-Let's make a stereophonic system

Overview

Recently, the number of distributors using 3D models such as VTuber is increasing. Many distributors use monochannel acoustic information for distribution, but in order to make the distribution more immersive, stereophonic sound is generated by using a microphone array such as 3Dio. When there are multiple sound sources, it is recommended to use such a microphone array. However, in a situation such as ASMR, when a single sound source (person) is made into stereophonic sound, stereophonic sound can be generated by signal processing. Of course, multiple sound sources are possible, but ...

What is important here is that the microphone part of the microphone array used for ASMR, such as 3Dio, which is a head-related transfer function, is in the shape of an ear. This is because it mimics the transmission of sound from the sound source to the eardrum of the ear. This transfer function is called the head related transfer function (HRTF). In this article, I will describe how to implement a system that generates stereophonic sound by signal processing using HRTFs up to the right and left ears in Python.

Note: HRTFs vary from person to person, and sound image position estimates before and after are often inconsistent. In fact, even when using 3Dio etc., it only imitates the standard ear shape and cannot solve individual differences.

Since the program is long, I will explain only the outline. For more information, please read the Git program below. Also, how to use it is described in git. Git: k-washi/stereophonic-Sound-System

Please refer to the following Qiita article as appropriate for logging, reading the configuration file, and gRPC communication for acquiring location information described in the program.

3D sound example (earphones required)

Please click the image below. You can listen to the 3D sound actually created by linking on Youtube. Since the microphone is attached to the mac, the original sound is bad, but you can see that it is stereophonic.

Youtube Link

Library installation


pip install PyAudio==0.2.11
pip install grpcio-tools
pip install numpy

Obtaining HRTF

Here, the HRTF database is read, converted to the frequency domain, and saved using pickle.

For the HRTF, we used the HRTF data (2) from Nagoya University HRTF.

The program is described below. The details are omitted as .... To see the full program, see acoustic / spacialSound.py.

acoustic/spacialSound.py


...

class HRTF():
  def __init__(self):
    ...
  
  def checkModel(self):
    #Get model file name
    ...
      self.getModelNameList()
    ...

  def getModelNameList(self):
    #Analyze HRTF file names for each azimuth and elevation
    ...
  
  def openData(self, path, dataType = np.int16):
    #H each direction,The number of samples of HRTF signal at elevation angle is 512 points, and this data is read and converted to numpy format.
    with open(path, 'r') as rData:
      temp = rData.read().split("\n")
      data = []
      for item in temp:
        if item != '':
          data.append(float(item))
      return np.array(data)
  
  def convHRTF2Np(self):
    #Read HRTF data for each azimuth and elevation, and 512 points based on the overlap add method+0 fill(512 points)Is FFT.(FFT is described in the next chapter)
    #The above FFT is performed on both ears, and all data is saved by pickle. The saveData function is used for saving.
    #For reading, the saved pickle data can be read with readData.
    for e, _ in enumerate(self.elev):
      LaziData = []
      RaziData = []
      for a, _ in enumerate(self.azimuth[e]):
        
        Lpath = self.Lpath[e][a]
        Rpath  = self.Rpath[e][a]

        Ldata = spec.overlapAdderFFT(self.openData(Lpath))
        Rdata = spec.overlapAdderFFT(self.openData(Rpath))
        
        LaziData.append(Ldata)
        RaziData.append(Rdata)

      self.hrtf[self.left].append(LaziData)
      self.hrtf[self.right].append(RaziData)

    self.saveData(self.hrtf, Conf.HRTFpath)
    self.saveData(self.elev, Conf.Elevpath)
    self.saveData(self.azimuth, Conf.Azimuthpath)

  def saveData(self, data, path):
    #pickle data storage
    try:
      with open(path, 'wb') as hrtf:
        pickle.dump(data, hrtf)
    except Exception as e:
      logger.critical(e)

  def readData(self, path):
    #pickle data reading
    try:
      with open(path, 'rb') as hrtf:
        data = pickle.load(hrtf)
    except Exception as e:
      logger.critical(e)

    return data

Fast Fourier Transform (FFT)

Here, the implementation related to FFT will be described.

For more information, see acoustic / acousticSignalProc.py

When processing acoustic information, a convolution integral of the HRTF and microphone input is required. However, when processed with a raw acoustic signal, the processing takes time and the data is difficult to handle. Therefore, a Fourier transform that converts the signal into information in the frequency domain is required. The FFT is the one that performs this Fourier transform at high speed.

The OverLap Add method (OLA) is often used when convolving HRTFs and mic inputs. For example, if the number of HRTF and microphone input samples is 512, the 512 points are filled with 0 in addition to each data. That is, 512 + 512 (0) data is created. After that, the positive frequency of FFT is calculated by the rfft function of numpy. In the case of numpy fft, 512/2 = 256 points of positive frequency components and 256 points of negative frequency components are calculated. However, rfft is used in many engineering applications because only positive frequency components are often sufficient. Also, although the calculation algorithms of numpy's rfft and fft are different, the error of the result is quite small, so rfft is used this time. Then, after executing the FFT, the HRTF and the microphone input are multiplied for each frequency. This combination will be explained in the next chapter.

In the following program, two FFTs, overlapAdderFFT and spacializeFFT, are prepared. What is the difference? Whether or not window (window function) is multiplied. Since the window function assumes the periodicity of the range cut out by the Fourier transform, a function that makes the end smaller is applied to the data so that the ends are connected. However, the HRTF has only 512 points per data, and the original data cannot be restored when the window function is applied, so it is used without applying the window function. On the other hand, the microphone input is applied with a window function. As will be explained in the next chapter, when the window function is applied, the end information is lost, so each data is used while shifting by 128 points.

acoustic/acousticSignalProc.py


import pyaudio
import numpy as np


class SpectrogramProcessing():
  def __init__(self, freq = Conf.SamplingRate):
    self.window = np.hamming(Conf.SysChunk)
    self.overlapFreq = np.fft.rfftfreq(Conf.SysChunk * 2, d=1./freq)
    self.overlapData = np.zeros((int(Conf.SysChunk * 2)), dtype = np.float32)
  
  def overlapAdderFFT(self, data):
    #Fill in 0 and FFT
    self.overlapData[:Conf.SysChunk] = data
    return np.fft.rfft(self.overlapData)
  
  def spacializeFFT(self, data):
    #Fill in 0 and apply hanning window.
    self.overlapData[:Conf.SysChunk] = data * self.window
    return np.fft.rfft(self.overlapData)


  def ifft(self, data):
    #in: chanel_num x freq num (if 1.6kHz, 0,...,7984.375 Hz) 
    #out: chanel_num x frame num(Conf.SysChunk = 512)
   
    return np.fft.irfft(data)

Microphone input, output, sound processing program

Next, the microphone input, output, and conversion program to stereophonic sound will be described. Processing is performed using the functions and the like described above. Also, regarding settings and gRPC, please refer to my past articles as mentioned at the beginning.

acoustic/audioStreamOverlapAdder.py



...

from acoustic.acousticSignalProc import AudioDevice, SpectrogramProcessing, WaveProcessing, convNp2pa, convPa2np
from acoustic.spacialSound import spacialSound
# ------------

import pyaudio
import numpy as np
import time

class MicAudioStream():
  def __init__(self):
    self.pAudio = pyaudio.PyAudio()
    self.micInfo = AudioDevice(Conf.MicID)
    self.outInfo = AudioDevice(Conf.OutpuID)

    #Output device restriction processing
    if self.outInfo.micOutChannelNum < 2:
      self.left = 0
      self.right = 0
    else:
      self.left = 0
      self.right = 1
      if self.outInfo.micOutChannelNum > 2:
        self.outInfo.micChannelNum = 2
        logger.info("I limited it to 2 channels because the number of output microphones is excessive.")

    self.startTime = time.time()

    #Currently, only 16-bit bit width is supported.(Because we have not confirmed the operation in other cases)
    if Conf.SysSampleWidth == 2:
      self.format = pyaudio.paInt16
      self.dtype = np.int16
    else:
      logger.critical("Currently not supported")
      exec(-1)

    self.fft = SpectrogramProcessing()

    #If you create numpy array format data every time and allocate memory, it will take time, so create it in advance.
    self.data = np.zeros((int(Conf.StreamChunk * 2)), dtype=self.dtype)
    self.npData = np.zeros((int(Conf.StreamChunk * 2)) , dtype=self.dtype)

    self.overlapNum = int(Conf.StreamChunk / Conf.SysFFToverlap) 

    self.freqData = np.zeros((self.overlapNum, self.outInfo.micOutChannelNum, self.fft.overlapFreq.shape[0]), dtype=np.complex)
    self.convFreqData = np.zeros((self.outInfo.micOutChannelNum, int(Conf.StreamChunk*3)) , dtype=self.dtype)
    self.outData = np.zeros((self.outInfo.micOutChannelNum * Conf.StreamChunk), dtype=self.dtype)
    
    self.Aweight = self.fft.Aweight() #I applied the A characteristic, but it didn't change much, so I don't have to worry about it. (May be erased)

    #Initial value of location information
    self.x = 0.2
    self.y = 10
    self.z  = 0.2

    #HRTF reading (acoustic/spacialSound.py)
    #You can execute processing that returns HRTF for location information
    self.hrft = spacialSound()
    
    #When recording stereophonic sound
    if Conf.Record:
      #test/listOrNumpy.Speed comparison with py
      #As for the Array format of numpy, it is faster to convert numpy to list and extend the list than to combine the array format.
      self.recordList = []

  def spacialSoundConvering(self, freqData):
    #Returns HRTFs for position
    lhrtf, rhrtf = self.hrft.getHRTF(self.x, self.y, self.z)
   
    #The input data of the HRTF microphone is convolved as shown below to generate stereophonic sound.
    freqData[self.left] = freqData[self.left] * lhrtf 
    freqData[self.right] = freqData[self.right] * rhrtf 
    return freqData * self.Aweight

  def callback(self, in_data, frame_count, time_info, status):
    #A function that processes sound data in the stream processing of pyAudio.
    #in_data is input, return is out_Sound data is output as data.

    if time.time() - self.startTime > Conf.SysCutTime:
      #Converting pyAudio format input to numpy format.
      self.npData[Conf.StreamChunk:] = convPa2np(np.fromstring(in_data, self.dtype), channelNum=self.micInfo.micChannelNum)[0, :] #ch1 input
      
      #Overlap width of data below(128)Generates stereophonic sound while shifting one by one.
      for i in range(self.overlapNum):
        #512 points(SysChunk)FFT is performed with the width of.
        self.freqData[i, :, :] = self.fft.spacializeFFT(self.npData[Conf.SysFFToverlap * i : Conf.SysChunk + Conf.SysFFToverlap * i])
        
        #The HRTF and mic input are folded.
        self.freqData[i, :, :] = self.spacialSoundConvering(self.freqData[i]) 
        
        #The frequency domain is converted to the time domain by the inverse Fourier transform.
        self.convFreqData[:, Conf.SysFFToverlap * i  : Conf.SysChunk * 2 + Conf.SysFFToverlap * i] += self.fft.ifft(self.freqData[i]).real.astype(self.dtype)#[:,:Conf.SysChunk]

      #Converting from numpy format to output format with pyAudio.
      self.outData[:] = convNp2pa(self.convFreqData[:,:Conf.StreamChunk]) 

      #The distance attenuation of the sound is calculated. Also, the sound is too loud, so I divide it by SysAttenuation.
      self.outData[:] = self.hrft.disanceAtenuation(self.outData[:], self.x, self.y, self.z) / Conf.SysAttenuation
      
      if Conf.Record:
        self.recordList += self.outData.tolist()
      
      #Initialize for next mic input
      self.npData[:Conf.StreamChunk] = self.npData[Conf.StreamChunk:]
      self.convFreqData[:, :Conf.StreamChunk*2] = self.convFreqData[:, Conf.StreamChunk:]
      self.convFreqData[:,Conf.StreamChunk*2:] = 0
      
    #Convert to pyAudio format data output format
    out_data = self.outData.tostring()
    
    return (out_data, pyaudio.paContinue)
  

  
  def start(self):
    #Set the input / output device and format in the following format and start processing.
    """
    rate – Sampling rate
    channels – Number of channels
    format – Sampling size and format. See PortAudio Sample Format.
    input – Specifies whether this is an input stream. Defaults to False.
    output – Specifies whether this is an output stream. Defaults to False.
    input_device_index – Index of Input Device to use. Unspecified (or None) uses default device. Ignored if input is False.
    output_device_index – Index of Output Device to use. Unspecified (or None) uses the default device. Ignored if output is False.
    frames_per_buffer – Specifies the number of frames per buffer.
    start – Start the stream running immediately. Defaults to True. In general, there is no reason to set this to False.
    input_host_api_specific_stream_info – Specifies a host API specific stream information data structure for input.
    output_host_api_specific_stream_info – Specifies a host API specific stream information data structure for output.
    stream_callback –Specifies a callback function for non-blocking (callback) operation. Default is None, which indicates blocking operation (i.e., Stream.read() and Stream.write()). To use non-blocking operation, specify a callback that conforms to the following signature:
    callback(in_data,      # recorded data if input=True; else None
            frame_count,  # number of frames
            time_info,    # dictionary
            status_flags) # PaCallbackFlags
    time_info is a dictionary with the following keys: input_buffer_adc_time, current_time, and output_buffer_dac_time; see the PortAudio documentation for their meanings. status_flags is one of PortAutio Callback Flag.
    The callback must return a tuple:
    (out_data, flag)
    out_data is a byte array whose length should be the (frame_count * channels * bytes-per-channel) if output=True or None if output=False. flag must be either paContinue, paComplete or paAbort (one of PortAudio Callback Return Code). When output=True and out_data does not contain at least frame_count frames, paComplete is assumed for flag.
    """
    self.stream = self.pAudio.open(
      format = self.format,
      rate = Conf.SamplingRate,#self.micInfo.samplingRate,
      channels = self.micInfo.micChannelNum,
      input = True,
      output = True,
      input_device_index = Conf.MicID,
      output_device_index = Conf.OutpuID,
      stream_callback = self.callback,
      frames_per_buffer = Conf.StreamChunk
    )

    self.stream.start_stream()

  def stop(self):
    #Necessary processing is executed separately from sound processing. Finally, the closing process is also executed when the system is shut down.
    #Here, gRPC is used to update the sound source position information.

    from proto.client import posClient
    
    grpcPosGetter = posClient()
    grpcPosGetter.open()

    while self.stream.is_active():
      time.sleep(0.1)
      try:
        ok = grpcPosGetter.posRequest()
        if ok:
          self.x, self.y, self.z = grpcPosGetter.getPos()
      except Exception as e:
        logger.error("pos getter error {0}".format(e))

      if time.time() - self.startTime > Conf.RecordTime + Conf.SysCutTime:
        break

    if Conf.Record:
      record = WaveProcessing()
      record.SaveFlatteData(self.recordList, channelNum=self.outInfo.micOutChannelNum)


    self.stream.start_stream()
    self.stream.close()
    self.close()
    grpcPosGetter.close()
    

  
  def close(self):
    self.pAudio.terminate()
    logger.debug("Close proc")
    exit(0)

if __name__ == "__main__":
  st = MicAudioStream()
  st.start()
  try:
    pass
  finally:
    st.stop()
  

Check input and output devices

In order to actually try the sound processing, it is necessary to confirm the ID of the input / output device.

Device information example

2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO     Index: 0 | Name: Built-in Microphone | ChannelNum: in 2 out 0 | SampleRate: 44100.0
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO     Index: 1 | Name: Built-in Output | ChannelNum: in 0 out 2 | SampleRate: 44100.0
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO     Index: 2 | Name: DisplayPort | ChannelNum: in 0 out 2 | SampleRate: 48000.0
...

The following shows a program that outputs input / output device information using PyAudio.

acoustic/acousticSignalProc.py


...

import pyaudio
import numpy as np

class AudioDevice():
  def __init__(self, devId = Conf.MicID):
    self.pAudio = pyaudio.PyAudio()

    self.setAudioDeviceInfo(devId)
    self.samplingRate = Conf.SamplingRate
    

  def getAudioDeviceInfo(self):
    #Output device information using PyAudio.
    for i in range(self.pAudio.get_device_count()):
      tempDic = self.pAudio.get_device_info_by_index(i)
      text = 'Index: {0} | Name: {1} | ChannelNum: in {2} out {3} | SampleRate: {4}'.format(tempDic['index'], tempDic['name'], tempDic['maxInputChannels'], tempDic['maxOutputChannels'], tempDic['defaultSampleRate'])
      logger.info(text)

  def setAudioDeviceInfo(self, micId = 0):
    #Check if the set device ID exists and retain the information of that ID
    micInfoDic = {}
    for i in range(self.pAudio.get_device_count()):
      micInfoDic = self.pAudio.get_device_info_by_index(i)
      if micInfoDic['index'] == micId:

        self.micName = micInfoDic['name']
        self.micChannelNum = micInfoDic['maxInputChannels']
        self.micOutChannelNum = micInfoDic['maxOutputChannels'] 
        self.micSamplingRate = int(micInfoDic['defaultSampleRate'])
        text = 'Set Audio Device Info || Index: {0} | Name: {1} | ChannelNum: {2}, {3} | SampleRate: {4}'.format(micId, self.micName, self.micChannelNum,self.micOutChannelNum, self.micSamplingRate)
        logger.info(text)

        if self.micChannelNum > 2:
          logger.critical("It does not support 3 or more microphone inputs.")
          exit(-1)

        break

      if self.pAudio.get_device_count() == i + 1:
        logger.critical("There is no microphone with the corresponding id.")

Summary

The above is the explanation about the stereophonic system using Python. It may be complicated because some parts have been omitted and other articles need to be referred to, but I hope it will be helpful. It is said that python is slow, but if you use numpy efficiently by allocating memory in advance, it can often be executed at a sufficient speed. I have written various other articles, so please refer to them.

Recommended Posts

Acoustic signal processing starting with Python-Let's make a stereophonic system
Acoustic signal processing with Python (2)
Acoustic signal processing with Python
Make a recommender system with python
Investment quest: Make a system trade with pyhton (2)
Investment quest: Make a system trade with pyhton (1)
Make a fortune with Python
Make a fire with kdeplot
Let's make a GUI with python.
Make a sound with Jupyter notebook
Try audio signal processing with librosa-Beginner
Let's make a breakout with wxPython
Make a filter with a django template
Let's make a graph with python! !!
Let's make a supercomputer with xCAT
Make a model iterator with PySide
Make a nice graph with plotly
Acoustic signal processing module that can be used with Python-Sounddevice ASIO [Application]
Acoustic signal processing module that can be used with Python-Sounddevice ASIO [Basic]
System trading starting with Python3: long-term investment
Let's make a shiritori game with Python
Make a video player with PySimpleGUI + OpenCV
"System trade starting with Python3" reading memo
Make a rare gacha simulator with Flask
Make a partially zoomed figure with matplotlib
Make a drawing quiz with kivy + PyTorch
Let's make a voice slowly with Python
Create a star system with Blender 2.80 script
Make a cascade classifier with google colaboratory
Let's make a simple language with PLY 1
Make a logic circuit with a perceptron (multilayer perceptron)
Make a Yes No Popup with Kivy
Make a wash-drying timer with a Raspberry Pi
Make a GIF animation with folder monitoring
Let's make a web framework with Python! (1)
"First Elasticsearch" starting with a python client
Let's make a tic-tac-toe AI with Pylearn 2
Make a desktop app with Python with Electron
Let's make a Twitter Bot with Python!
Let's make a web framework with Python! (2)
High resolution acoustic signal processing (1) --How to read 24-bit wav file with Python