Traitement du signal acoustique à partir de Python - Faisons un système acoustique en trois dimensions

Aperçu

Récemment, le nombre de distributeurs utilisant des modèles 3D tels que VTuber est en augmentation. De nombreux distributeurs utilisent des informations acoustiques monocanaux pour la distribution, mais afin de rendre la distribution plus immersive, un son stéréoscopique est généré en utilisant un réseau de microphones tel que 3Dio. Lorsqu'il existe plusieurs sources sonores, il est recommandé d'utiliser un tel réseau de microphones. Cependant, dans une situation telle que l'ASMR, lorsqu'une seule source sonore (personne) est transformée en son stéréoscopique, un son stéréoscopique peut être généré par le traitement du signal. Bien sûr, plusieurs sources sonores sont possibles, mais ...

Ce qui est important ici, c'est que la partie microphone du réseau de microphones utilisée pour l'ASMR comme 3Dio, qui est la fonction de transmission de la tête, a la forme d'une oreille. C'est parce qu'il imite la transmission du son de la source sonore à la membrane tympanique de l'oreille. Cette fonction de transfert est appelée fonction de transfert de tête (HRTF). Dans cet article, je décrirai comment implémenter un système qui génère un son stéréophonique par traitement du signal en utilisant HRTF pour les oreilles droite et gauche en Python.

Remarque: les HRTF varient d'une personne à l'autre et les estimations de la position de l'image sonore avant et après sont souvent incohérentes. En fait, même lors de l'utilisation de 3Dio etc., il n'imite que la forme d'oreille standard et ne peut pas résoudre les différences individuelles.

Le programme étant long, je n'expliquerai que les grandes lignes. Pour plus d'informations, veuillez lire le programme Git ci-dessous. De plus, son utilisation est décrite dans git. Git: k-washi/stereophonic-Sound-System

Veuillez vous reporter à l'article Qiita suivant, le cas échéant, pour la journalisation, la lecture du fichier de configuration et la communication gRPC pour obtenir les informations de localisation décrites dans le programme.

Exemple de son tridimensionnel (écouteurs requis)

Veuillez cliquer sur l'image ci-dessous. Vous pouvez écouter le son tridimensionnel réellement créé en créant un lien sur Youtube. Puisque le microphone est attaché au mac, le son d'origine est mauvais, mais vous pouvez voir qu'il s'agit d'un son en trois dimensions.

Youtube Link

Installation de la bibliothèque


pip install PyAudio==0.2.11
pip install grpcio-tools
pip install numpy

Obtention de HRTF

Ici, la base de données HRTF est lue, convertie dans la région de fréquence et enregistrée à l'aide de pickle.

Pour le HRTF, les données HRTF (2) de Nagoya University HRTF ont été utilisées.

Le programme est décrit ci-dessous. Les détails sont omis car ... Pour voir le programme complet, voir acoustique / spacialSound.py.

acoustic/spacialSound.py


...

class HRTF():
  def __init__(self):
    ...
  
  def checkModel(self):
    #Obtenir le nom du fichier de modèle
    ...
      self.getModelNameList()
    ...

  def getModelNameList(self):
    #Analyser le nom de fichier HRTF pour chaque azimut et élévation
    ...
  
  def openData(self, path, dataType = np.int16):
    #H chaque direction,Le nombre d'échantillons du signal HRTF à l'angle d'élévation est de 512 points, et ces données sont lues et converties au format numpy.
    with open(path, 'r') as rData:
      temp = rData.read().split("\n")
      data = []
      for item in temp:
        if item != '':
          data.append(float(item))
      return np.array(data)
  
  def convHRTF2Np(self):
    #Lire les données HRTF pour chaque orientation et élévation, et 512 points en fonction de la méthode Add de chevauchement+0 remplissage(512 points)Est FFT.(La FFT est décrite dans le chapitre suivant)
    #La FFT ci-dessus est effectuée sur les deux oreilles et toutes les données sont enregistrées par pickle. La fonction saveData est utilisée pour la sauvegarde.
    #Pour la lecture, les données de pickle enregistrées peuvent être lues avec readData.
    for e, _ in enumerate(self.elev):
      LaziData = []
      RaziData = []
      for a, _ in enumerate(self.azimuth[e]):
        
        Lpath = self.Lpath[e][a]
        Rpath  = self.Rpath[e][a]

        Ldata = spec.overlapAdderFFT(self.openData(Lpath))
        Rdata = spec.overlapAdderFFT(self.openData(Rpath))
        
        LaziData.append(Ldata)
        RaziData.append(Rdata)

      self.hrtf[self.left].append(LaziData)
      self.hrtf[self.right].append(RaziData)

    self.saveData(self.hrtf, Conf.HRTFpath)
    self.saveData(self.elev, Conf.Elevpath)
    self.saveData(self.azimuth, Conf.Azimuthpath)

  def saveData(self, data, path):
    #stockage de données pickle
    try:
      with open(path, 'wb') as hrtf:
        pickle.dump(data, hrtf)
    except Exception as e:
      logger.critical(e)

  def readData(self, path):
    #lecture de données pickle
    try:
      with open(path, 'rb') as hrtf:
        data = pickle.load(hrtf)
    except Exception as e:
      logger.critical(e)

    return data

Transformée de Fourier rapide (FFT)

Ici, l'implémentation liée à la FFT sera décrite.

Pour plus d'informations, voir acoustique / acoustiqueSignalProc.py

Lors du traitement des informations acoustiques, une intégration convolutionnelle du HRTF et de l'entrée microphone est nécessaire. Cependant, lorsqu'il est traité avec un signal acoustique brut, le traitement prend du temps et les données sont difficiles à gérer. Par conséquent, une transformée de Fourier qui convertit le signal en informations dans la région de fréquence est nécessaire. La FFT est celle qui effectue cette transformation de Fourier à grande vitesse.

La méthode OverLap Add (OLA) est souvent utilisée lors de la convolution du HRTF avec l'entrée microphone. Par exemple, si le nombre d'échantillons de HRTF et d'entrée microphone est de 512 points, 512 points sont remplis avec 0 en plus de chaque donnée. Autrement dit, 512 + 512 (0) données sont créées. Après cela, la fréquence positive de FFT est calculée par la fonction rfft de numpy. Dans le cas de fft de numpy, 512/2 = 256 points de composantes de fréquence positives et 256 points de composantes de fréquence négatives sont calculés. Cependant, rfft est utilisé dans de nombreuses applications d'ingénierie car seules des composantes à fréquence positive sont souvent suffisantes. De plus, bien que les algorithmes de calcul de numpy rfft et fft soient différents, l'erreur du résultat est assez petite, donc rfft est utilisé cette fois. Ensuite, après l'exécution de FFT, HRTF et l'entrée microphone sont multipliées pour chaque fréquence. Cette astuce sera expliquée dans le chapitre suivant.

Dans le programme suivant, deux FFT, overlapAdderFFT et spacializeFFT, sont préparés. Quelle est la différence? Si la fenêtre (fonction de fenêtre) est multipliée ou non. Puisque la fonction de fenêtre assume la périodicité de la plage coupée par la transformée de Fourier, une fonction qui rend la fin plus petite est appliquée aux données de sorte que les extrémités soient connectées. Cependant, HRTF n'a que 512 points par données, et lorsque la fonction de fenêtre est appliquée, les données d'origine ne peuvent pas être restaurées, elles sont donc utilisées sans appliquer la fonction de fenêtre. D'autre part, l'entrée microphone a une fonction de fenêtre. Comme cela sera expliqué dans le chapitre suivant, lorsque la fonction de fenêtre est appliquée, les informations de fin sont perdues, de sorte que chaque donnée est utilisée en décalant de 128 points.

acoustic/acousticSignalProc.py


import pyaudio
import numpy as np


class SpectrogramProcessing():
  def __init__(self, freq = Conf.SamplingRate):
    self.window = np.hamming(Conf.SysChunk)
    self.overlapFreq = np.fft.rfftfreq(Conf.SysChunk * 2, d=1./freq)
    self.overlapData = np.zeros((int(Conf.SysChunk * 2)), dtype = np.float32)
  
  def overlapAdderFFT(self, data):
    #Remplissez 0 et FFT
    self.overlapData[:Conf.SysChunk] = data
    return np.fft.rfft(self.overlapData)
  
  def spacializeFFT(self, data):
    #Remplissez 0 et appliquez la fenêtre de suspension.
    self.overlapData[:Conf.SysChunk] = data * self.window
    return np.fft.rfft(self.overlapData)


  def ifft(self, data):
    #in: chanel_num x freq num (if 1.6kHz, 0,...,7984.375 Hz) 
    #out: chanel_num x frame num(Conf.SysChunk = 512)
   
    return np.fft.irfft(data)

Entrée microphone, sortie, programme de traitement du son

Ensuite, le programme réel d'entrée, de sortie et de conversion du microphone en son stéréophonique sera décrit. Le traitement est effectué en utilisant les fonctions et analogues décrites ci-dessus. En outre, concernant les paramètres et gRPC, veuillez vous référer à mes articles précédents comme mentionné au début.

acoustic/audioStreamOverlapAdder.py



...

from acoustic.acousticSignalProc import AudioDevice, SpectrogramProcessing, WaveProcessing, convNp2pa, convPa2np
from acoustic.spacialSound import spacialSound
# ------------

import pyaudio
import numpy as np
import time

class MicAudioStream():
  def __init__(self):
    self.pAudio = pyaudio.PyAudio()
    self.micInfo = AudioDevice(Conf.MicID)
    self.outInfo = AudioDevice(Conf.OutpuID)

    #Traitement de restriction de périphérique de sortie
    if self.outInfo.micOutChannelNum < 2:
      self.left = 0
      self.right = 0
    else:
      self.left = 0
      self.right = 1
      if self.outInfo.micOutChannelNum > 2:
        self.outInfo.micChannelNum = 2
        logger.info("Je l'ai limité à 2 canaux car le nombre de micros de sortie est excessif.")

    self.startTime = time.time()

    #Actuellement, seule une largeur de bits de 16 bits est prise en charge.(Parce que nous n'avons pas confirmé l'opération dans d'autres cas)
    if Conf.SysSampleWidth == 2:
      self.format = pyaudio.paInt16
      self.dtype = np.int16
    else:
      logger.critical("Actuellement non pris en charge")
      exec(-1)

    self.fft = SpectrogramProcessing()

    #Si vous créez des données au format tableau numpy à chaque fois et allouez de la mémoire, cela prendra du temps, alors créez-le à l'avance.
    self.data = np.zeros((int(Conf.StreamChunk * 2)), dtype=self.dtype)
    self.npData = np.zeros((int(Conf.StreamChunk * 2)) , dtype=self.dtype)

    self.overlapNum = int(Conf.StreamChunk / Conf.SysFFToverlap) 

    self.freqData = np.zeros((self.overlapNum, self.outInfo.micOutChannelNum, self.fft.overlapFreq.shape[0]), dtype=np.complex)
    self.convFreqData = np.zeros((self.outInfo.micOutChannelNum, int(Conf.StreamChunk*3)) , dtype=self.dtype)
    self.outData = np.zeros((self.outInfo.micOutChannelNum * Conf.StreamChunk), dtype=self.dtype)
    
    self.Aweight = self.fft.Aweight() #La caractéristique A est appliquée, mais comme elle n'a pratiquement pas changé, il n'y a pas lieu de s'en inquiéter. (Peut être effacé)

    #Valeur initiale des informations de position
    self.x = 0.2
    self.y = 10
    self.z  = 0.2

    #Lecture HRTF (acoustique/spacialSound.py)
    #Vous pouvez exécuter le processus de renvoi de HRTF pour les informations de position.
    self.hrft = spacialSound()
    
    #Lors de l'enregistrement d'un son stéréophonique
    if Conf.Record:
      #test/listOrNumpy.Comparaison de vitesse avec py
      #Pour le format Array de numpy, il est plus rapide de convertir numpy en liste et d'étendre la liste que de combiner les formats de tableau.
      self.recordList = []

  def spacialSoundConvering(self, freqData):
    #Renvoie HRTF pour la position
    lhrtf, rhrtf = self.hrft.getHRTF(self.x, self.y, self.z)
   
    #Les données d'entrée du microphone HRTF sont alambiquées comme indiqué ci-dessous pour générer un son stéréophonique.
    freqData[self.left] = freqData[self.left] * lhrtf 
    freqData[self.right] = freqData[self.right] * rhrtf 
    return freqData * self.Aweight

  def callback(self, in_data, frame_count, time_info, status):
    #Une fonction qui traite les données sonores dans le traitement de flux de pyAudio.
    #in_les données sont entrées, le retour est sorti_Les données sonores sont sorties sous forme de données.

    if time.time() - self.startTime > Conf.SysCutTime:
      #Conversion de l'entrée au format pyAudio au format numpy.
      self.npData[Conf.StreamChunk:] = convPa2np(np.fromstring(in_data, self.dtype), channelNum=self.micInfo.micChannelNum)[0, :] #ch1 input
      
      #Chevauchement de la largeur des données ci-dessous(128)Générez un son tridimensionnel tout en les décalant un par un.
      for i in range(self.overlapNum):
        #512 points(SysChunk)La FFT est réalisée avec une largeur de.
        self.freqData[i, :, :] = self.fft.spacializeFFT(self.npData[Conf.SysFFToverlap * i : Conf.SysChunk + Conf.SysFFToverlap * i])
        
        #Le HRTF et l'entrée microphone sont repliés.
        self.freqData[i, :, :] = self.spacialSoundConvering(self.freqData[i]) 
        
        #La région de fréquence est convertie en région de temps par transformation de Fourier inverse.
        self.convFreqData[:, Conf.SysFFToverlap * i  : Conf.SysChunk * 2 + Conf.SysFFToverlap * i] += self.fft.ifft(self.freqData[i]).real.astype(self.dtype)#[:,:Conf.SysChunk]

      #Conversion du format numpy au format de sortie pyAudio.
      self.outData[:] = convNp2pa(self.convFreqData[:,:Conf.StreamChunk]) 

      #L'atténuation de distance du son est calculée. De plus, le son est trop fort, je le divise donc par SysAttenuation.
      self.outData[:] = self.hrft.disanceAtenuation(self.outData[:], self.x, self.y, self.z) / Conf.SysAttenuation
      
      if Conf.Record:
        self.recordList += self.outData.tolist()
      
      #Initialiser pour la prochaine entrée du microphone
      self.npData[:Conf.StreamChunk] = self.npData[Conf.StreamChunk:]
      self.convFreqData[:, :Conf.StreamChunk*2] = self.convFreqData[:, Conf.StreamChunk:]
      self.convFreqData[:,Conf.StreamChunk*2:] = 0
      
    #Convertir au format de sortie de données au format pyAudio
    out_data = self.outData.tostring()
    
    return (out_data, pyaudio.paContinue)
  

  
  def start(self):
    #Réglez le périphérique d'entrée / sortie et le format au format suivant et démarrez le traitement.
    """
    rate – Sampling rate
    channels – Number of channels
    format – Sampling size and format. See PortAudio Sample Format.
    input – Specifies whether this is an input stream. Defaults to False.
    output – Specifies whether this is an output stream. Defaults to False.
    input_device_index – Index of Input Device to use. Unspecified (or None) uses default device. Ignored if input is False.
    output_device_index – Index of Output Device to use. Unspecified (or None) uses the default device. Ignored if output is False.
    frames_per_buffer – Specifies the number of frames per buffer.
    start – Start the stream running immediately. Defaults to True. In general, there is no reason to set this to False.
    input_host_api_specific_stream_info – Specifies a host API specific stream information data structure for input.
    output_host_api_specific_stream_info – Specifies a host API specific stream information data structure for output.
    stream_callback –Specifies a callback function for non-blocking (callback) operation. Default is None, which indicates blocking operation (i.e., Stream.read() and Stream.write()). To use non-blocking operation, specify a callback that conforms to the following signature:
    callback(in_data,      # recorded data if input=True; else None
            frame_count,  # number of frames
            time_info,    # dictionary
            status_flags) # PaCallbackFlags
    time_info is a dictionary with the following keys: input_buffer_adc_time, current_time, and output_buffer_dac_time; see the PortAudio documentation for their meanings. status_flags is one of PortAutio Callback Flag.
    The callback must return a tuple:
    (out_data, flag)
    out_data is a byte array whose length should be the (frame_count * channels * bytes-per-channel) if output=True or None if output=False. flag must be either paContinue, paComplete or paAbort (one of PortAudio Callback Return Code). When output=True and out_data does not contain at least frame_count frames, paComplete is assumed for flag.
    """
    self.stream = self.pAudio.open(
      format = self.format,
      rate = Conf.SamplingRate,#self.micInfo.samplingRate,
      channels = self.micInfo.micChannelNum,
      input = True,
      output = True,
      input_device_index = Conf.MicID,
      output_device_index = Conf.OutpuID,
      stream_callback = self.callback,
      frames_per_buffer = Conf.StreamChunk
    )

    self.stream.start_stream()

  def stop(self):
    #Le traitement nécessaire est exécuté séparément du traitement du son. Enfin, le processus de fermeture est également exécuté lorsque le système est arrêté.
    #Ici, gRPC est utilisé pour mettre à jour les informations de position de la source sonore.

    from proto.client import posClient
    
    grpcPosGetter = posClient()
    grpcPosGetter.open()

    while self.stream.is_active():
      time.sleep(0.1)
      try:
        ok = grpcPosGetter.posRequest()
        if ok:
          self.x, self.y, self.z = grpcPosGetter.getPos()
      except Exception as e:
        logger.error("pos getter error {0}".format(e))

      if time.time() - self.startTime > Conf.RecordTime + Conf.SysCutTime:
        break

    if Conf.Record:
      record = WaveProcessing()
      record.SaveFlatteData(self.recordList, channelNum=self.outInfo.micOutChannelNum)


    self.stream.start_stream()
    self.stream.close()
    self.close()
    grpcPosGetter.close()
    

  
  def close(self):
    self.pAudio.terminate()
    logger.debug("Close proc")
    exit(0)

if __name__ == "__main__":
  st = MicAudioStream()
  st.start()
  try:
    pass
  finally:
    st.stop()
  

Vérifiez les périphériques d'entrée et de sortie

Afin d'essayer réellement le traitement du son, il est nécessaire de confirmer l'ID du périphérique d'entrée / sortie.

Exemple d'informations sur l'appareil

2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO     Index: 0 | Name: Built-in Microphone | ChannelNum: in 2 out 0 | SampleRate: 44100.0
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO     Index: 1 | Name: Built-in Output | ChannelNum: in 0 out 2 | SampleRate: 44100.0
2020-01-16 03:46:49,436 [acousticSignalProc.py:34] INFO     Index: 2 | Name: DisplayPort | ChannelNum: in 0 out 2 | SampleRate: 48000.0
...

Ce qui suit montre un programme qui génère des informations sur les périphériques d'entrée / sortie à l'aide de PyAudio.

acoustic/acousticSignalProc.py


...

import pyaudio
import numpy as np

class AudioDevice():
  def __init__(self, devId = Conf.MicID):
    self.pAudio = pyaudio.PyAudio()

    self.setAudioDeviceInfo(devId)
    self.samplingRate = Conf.SamplingRate
    

  def getAudioDeviceInfo(self):
    #Sortez les informations du périphérique à l'aide de PyAudio.
    for i in range(self.pAudio.get_device_count()):
      tempDic = self.pAudio.get_device_info_by_index(i)
      text = 'Index: {0} | Name: {1} | ChannelNum: in {2} out {3} | SampleRate: {4}'.format(tempDic['index'], tempDic['name'], tempDic['maxInputChannels'], tempDic['maxOutputChannels'], tempDic['defaultSampleRate'])
      logger.info(text)

  def setAudioDeviceInfo(self, micId = 0):
    #Vérifiez si l'ID d'appareil défini existe et conservez les informations de cet ID
    micInfoDic = {}
    for i in range(self.pAudio.get_device_count()):
      micInfoDic = self.pAudio.get_device_info_by_index(i)
      if micInfoDic['index'] == micId:

        self.micName = micInfoDic['name']
        self.micChannelNum = micInfoDic['maxInputChannels']
        self.micOutChannelNum = micInfoDic['maxOutputChannels'] 
        self.micSamplingRate = int(micInfoDic['defaultSampleRate'])
        text = 'Set Audio Device Info || Index: {0} | Name: {1} | ChannelNum: {2}, {3} | SampleRate: {4}'.format(micId, self.micName, self.micChannelNum,self.micOutChannelNum, self.micSamplingRate)
        logger.info(text)

        if self.micChannelNum > 2:
          logger.critical("Il ne prend pas en charge 3 entrées microphone ou plus.")
          exit(-1)

        break

      if self.pAudio.get_device_count() == i + 1:
        logger.critical("Il n'y a pas de microphone d'identification correspondant.")

Résumé

Ce qui précède est l'explication du système de son tridimensionnel utilisant Python. Cela peut être compliqué car il y a certaines parties que j'ai beaucoup omises et je dois me référer à d'autres articles, mais j'espère que cela sera utile. On dit que python est lent, mais si vous utilisez numpy efficacement, par exemple en allouant de la mémoire à l'avance, il peut souvent être exécuté à une vitesse suffisante. J'ai écrit divers autres articles, veuillez donc vous y référer.

Recommended Posts

Traitement du signal acoustique à partir de Python - Faisons un système acoustique en trois dimensions
Traitement du signal acoustique avec Python (2)
Traitement du signal acoustique avec Python
Créer un système de recommandation avec python
Faites une loterie avec Python
Faire un feu avec kdeplot
Faisons une interface graphique avec python.
Faites un son avec le notebook Jupyter
Essayez le traitement du signal audio avec librosa-Beginner
Faisons une rupture de bloc avec wxPython
Créer un filtre avec un modèle django
Faisons un graphe avec python! !!
Faisons un spacon avec xCAT
Créer un itérateur de modèle avec PySide
Faire un joli graphique avec plotly
Module de traitement du signal acoustique qui peut être utilisé avec Python-Sounddevice ASIO [Application]
Python-Sound device Module de traitement du signal acoustique ASIO [Basic]
Commerce système à partir de Python3: investissement à long terme
Faisons un jeu de shiritori avec Python
Créer un lecteur vidéo avec PySimpleGUI + OpenCV
"Commerce du système à partir de Python3" lecture du mémo
Créez un simulateur de gacha rare avec Flask
Faire une figure partiellement zoomée avec matplotlib
Créez un quiz de dessin avec kivy + PyTorch
Faisons la voix lentement avec Python
Créez un système stellaire avec le script Blender 2.80
Créez un classificateur en cascade avec Google Colaboratory
Faisons un langage simple avec PLY 1
Faire un circuit logique avec Perceptron (Perceptron multicouche)
Faire Oui Non Popup avec Kivy
Faire une minuterie de lavage-séchage avec Raspberry Pi
Créer une animation GIF avec surveillance des dossiers
Créez un framework Web avec Python! (1)
"Première recherche élastique" commençant par un client python
Faisons une IA à trois yeux avec Pylearn 2
Créez une application de bureau avec Python avec Electron
Faisons un bot Twitter avec Python!
Créez un framework Web avec Python! (2)
Traitement du signal acoustique haute résolution (1) - Comment lire un fichier wav 24 bits avec Python