Ici, nous traitons de l'analyse des fluctuations temporelles à l'aide de python, en particulier des données chronologiques (appelées courbe de lumière) de l'intensité des photons du trou noir observés par les rayons X à titre d'exemple. Si seul le format de fichier est préparé, il peut être utilisé pour l'analyse de toutes les données de séries chronologiques. Même si vous ne le codez pas vous-même, vous pouvez faire la plupart des choses avec les outils fournis par la NASA appelés XRONOS. Cependant, c'est pour ces personnes, car il est plus étudiant de l'écrire par vous-même pour obtenir des détails et une nouvelle analyse des fluctuations de temps. L'environnement d'exploitation est la série anaconda python3, qui se trouve sur le terminal mac. On suppose que ls peut être utilisé. Linux est correct, mais Windows peut avoir besoin d'être peaufiné. (Mise à jour pour python3 le 2020.04.24 avec la version = 1.2, et le calcul de la phase de déroulement a été inclus avec 2020-05-12 ver 1.3.)
Comment télécharger des exemples de fichiers et de code
Le facteur de normalisation du spectre de puissance (PSD) (norme), lorsqu'il est intégré à toutes les fréquences, Veuillez normaliser à RMS ^ 2. Je pense que c'est le plus utilisé. Si vous voulez vérifier Voir Comment confirmer le théorème de Persival en utilisant matplotlib et scipy Fourier transform (FFT).
Comment réduire le temps d'appliquer le spectre de puissance une fois
Considérez les trois paramètres de et utilisez le paramètre optimal.
Par exemple, si la luminosité est d'environ plusieurs c / s,
--Une longueur FFT T = 256sec à 8096sec --Temps bin dT = 1 sec à 64 sec --Nombre moyen de fois N = 1 à 5 fois
Je pense que c'est un niveau raisonnable. On estime que la fluctuation statistique est d'environ plusieurs% à 10% avec des centaines de rayons X par FFT sur commande. Cela dépend de la science et de la physique ciblées.
Le fichier (xis3_src_0p1sec_6000.qdp) fait partie de la courbe de lumière du trou noir. Le fichier fit est converti en texte avec flc2ascii et seules 6000 lignes sont extraites.
python
$cat xis3_src_0p1sec_6000.qdp
7.05204864025116e+02 3.20020450000000e+01 1.60010220000000e+01 1.00000010000000e+00
7.05329855978489e+02 8.00051120000000e+00 8.00051120000000e+00 1.00000010000000e+00
7.05454847991467e+02 8.00051120000000e+00 8.00051120000000e+00 1.00000010000000e+00
....
Comme Temps (sec) Taux de comptage (c / s) Erreur de taux de comptage (c / s) L'exposition fractionnaire (le taux auquel les données sont emballées par bac) est obstruée, 4 lignes x heures. Faites-en un fichier. Non limité à cela, préparez un code simple afin que l'efficacité du débogage puisse être améliorée avec une petite quantité de données.
python
/bin/ls xis3_src_0p1sec_6000.qdp > test1lc.list
Préparez un fichier avec le nom du fichier écrit dessus. Ceci est similaire à l'utilisation des imbéciles pour transmettre un fichier avec un nom de fichier lorsque vous souhaitez traiter plusieurs fichiers à la fois.
Exécutez Ana_Timing_do_FFT.py.
python
python Ana_Timing_do_FFT.py test1lc.list -a 10 -l 128
ici, Je l'ai utilisé comme "python Ana_Timing_do_FFT.py test1lc.list -un nombre moyen de fois -l nombre de bacs utilisés pour une FFT".
Il existe d'autres options.
python
$python Ana_Timing_do_FFT.py -h
Usage: Ana_Timing_do_FFT.py fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-d, --debug The flag to show detailed information (Flag)
-p, --pltflag to plot time-series data
-m HLEN, --hlen=HLEN Header length
-c CLEN, --clen=CLEN Column length
-a ANUM, --anum=ANUM Average number
-l LENGTH, --length=LENGTH
FFT bin number (e.g., 1024, 2048, ..)
-t TCOL, --tcol=TCOL column number for time
-s VCOL, --vcol=VCOL column number for value
-e VECOL, --vecol=VECOL
column number for value error (if negative, it skips
error column)
Si vous ajoutez -p dans les options, le chiffre du résultat de chaque FFT sera sauvegardé. Si vous voulez voir l'état de FFT avant de calculer la moyenne, ajoutez l'option -p et exécutez-la.
Si cela fonctionne et fonctionne,
fig_fft/Graphique du spectre de puissance
fig_phase/Développer la phase du spectre de puissance
fig_lc/Tracé de la courbe de lumière
text_fft/Dump de texte du spectre de puissance(Time vs. Power^2 [rms^2/Hz])
Le répertoire est généré et une figure est générée.
Le spectre de puissance unidimensionnel généré est stocké dans text_fft, et son nom de fichier est mis dans test1fft.list. Ana_Timing_plot2d_FFT.py génère un spectre de puissance bidimensionnel avec la liste de fichiers comme argument.
python
$ /bin/ls text_fft/xis3_src_0p1sec_6000_l128_a10_fft_00000* > test1fft.list
$ python Ana_Timing_plot2d_FFT.py test1fft.list
Ensuite, un histogramme bidimensionnel est généré.
Ensuite, les données de l'étoile étoile trou noir J1550 acquises par le satellite avec une grande surface effective appelée RXTE [j1550_lowF_50ms_forqiita.txt](http://www-x.phys.se.tmu.ac.jp/%7Esyamada/qiita /v1_python3/j1550_lowF_50ms_forqiita.txt) Essayons un exemple où la fluctuation quasi-périodique (QPO) du trou noir peut être bien vue.
python
/bin/ls j1550_lowF_50ms_forqiita.txt > rxtelc.list
python Ana_Timing_do_FFT.py rxtelc.list -a 10 -l 512 -d
/bin/ls text_fft/j1550_lowF_50ms_forqiita_l512_a10_fft_0000* > rxtefft.list
python Ana_Timing_plot2d_FFT.py rxtefft.list
--Courbe de lumière
Ce qui peut être vu autour de ce ~ 0,5 Hz est la fluctuation quasi-périodique du trou noir.
--Résultats pour chaque FFT
Avec l'option -p, le résultat de chaque FFT est tracé et placé dans un répertoire appelé scipy_fft_check.
python
python Ana_Timing_do_FFT.py rxtelc.list -a 10 -l 512 -d -p
Si vous utilisez la commande convert pour convertir en animation gif,
python
convert -delay 50 -loop 5 *.png j1550_fftcheck.gif
Comme indiqué, la série chronologique, la PSD et la phase par intervalle sont tracées.
Si l'extension est .lc ou .fits, elle sera reconnue comme format FITS et le fichier sera automatiquement lu par astropy.
python
/bin/ls ae905005010hxd_0_pinno_cl_pha25-160.lc > xislc.list
python Ana_Timing_do_FFT.py xislc.list -a 400 -l 64
/bin/ls text_fft/ae905005010hxd_0_pinno_cl_pha25-160_l64_a400_fft_0000* > xisfft.list
python Ana_Timing_plot2d_FFT.py xisfft.list
--Courbe de lumière
Le temps d'observation est d'environ 70 k, mais en réalité, il n'y a pas de temps d'observation efficace à environ la moitié de celui en raison de l'arrière de la terre et de la limitation de l'angle du soleil. Dans le cas des satellites en orbite, c'est la situation dans la plupart des cas. Dans ce programme, s'il n'y a pas assez de données pour effectuer une FFT, elle sera ignorée. Dans xronos, un fichier de paramètres appelé fichier de fenêtre est utilisé, et seul le fuseau horaire lorsque plusieurs conditions sont effacées est utilisé pour le calcul.
Le QPO du trou noir est visible autour de ~ 3Hz. C'est souvent faible comme ça, et il est nécessaire de concevoir des statistiques et des analyses afin d'obtenir des paramètres avec précision.
Tout ce dont tu as besoin c'est
--Création d'un spectre unidimensionnel et d'une courbe de lumière Ana_Timing_do_FFT.py --Générer un spectre de puissance bidimensionnel Ana_Timing_plot2d_FFT.py
Seuls deux d'entre eux sont OK. Cela correspond au système python3. Si vous entrez des ajustements, vous avez besoin d'une copie astronomique.
Même si vous ne disposez pas d'un environnement python ou avez des difficultés à créer un environnement, vous pouvez utiliser google Colab tant que vous disposez d'un compte Google. Comment utiliser Google Colab
Prière de se référer à.
Tout d'abord, mettons l'ensemble des fichiers sur Google Drive. Alors montons-le pour que google Colab puisse accéder aux fichiers sur google drive.
from google import colab
colab.drive.mount('/content/gdrive')
Obtenez un mot de passe temporaire et copiez-le. Puis
Depuis google Colab, vous pouvez parcourir les fichiers placés sur google drive. Ensuite, copions l'ensemble de fichiers sur Google Drive.
python
!cp -rf /content/gdrive/My\ Drive/lié à python/202004/v1_python3 .
Vous avez maintenant copié l'ensemble de fichiers sur google Colab. Les experts peuvent l'ignorer, mais assurez-vous de vérifier où vous vous trouvez et de voir correctement les fichiers.
Utilisez cd pour vous déplacer, pwd pour vérifier votre emplacement actuel et ls pour afficher une liste de fichiers dans votre emplacement actuel.
En outre, cela devrait fonctionner comme décrit dans cet article. (J'utilise la série python3 et je ne devrais utiliser que des modules de base. Veuillez signaler si cela ne fonctionne pas.)
/ bin / ls et ls doivent être identiques. Cependant, en fonction de l'environnement, il existe un paramètre pour afficher des informations autres que le nom de fichier avec ls, nous écrivons donc ici pour appeler / bin / ls sans options.
Ana_Timing_do_FFT.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" Ana_Timing_do_FFT.py is to plot lightcurves.
This is a python script for timing analysis.
History:
2016-09-13 ; ver 1.0; first draft from scratch
2020-04-24 ; ver 1.2; updated for python3
2020-05-12 ; ver 1.3; unwrap phase included
"""
__author__ = 'Shinya Yamada (syamada(at)tmu.ac.jp'
__version__= '1.2'
############# scipy ############################################################
import scipy as sp # scipy :Un module incontournable pour les opérations numériques
import scipy.fftpack as sf #Module FFT. Utilise la bibliothèque FFT de fortan.
################################################################################
############# numpy ############################################################
import numpy as np # numpy :Un module indispensable pour le calcul de tableaux
from numpy import linalg as LA
################################################################################
############# matplotlib #######################################################
import matplotlib.pylab as plab
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, FixedLocator
# For 3D plot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
#Redimensionnement des étiquettes
params = {'xtick.labelsize': 13, # x ticks
'ytick.labelsize': 13, # y ticks
'legend.fontsize': 10
}
plt.rcParams['font.family'] = 'serif' #Spécifiez la police.(serif est dans tous les environnements.)
plt.rcParams.update(params)
#################################################################################
import random #Module de génération de nombres aléatoires(no use)
import array # (no use)
import os #Modules liés au système d'exploitation
import sys #modules liés au système
import math #Module de fonction mathématique
#import commands #Module d'utilisation des commandes shell pour python2
import subprocess #Module d'utilisation des commandes shell pour python3
# import pyfits #adapte le module
import optparse #Relation d'option
#import yaml
import re #Prise en charge des expressions régulières
import csv #fichier csv
import datetime #Prise en charge du temps
import pytz # timezoze
# jst = pytz.timezone('Asia/Tokyo')
# PyROOT
#import ROOT
#ROOT.gROOT.SetBatch(1)
def average_scipy_fft( allinputarray, dt, filtname=None, pltflag=False, outname = "default", anum = 10, length = 'default', debug = False):
"""
Reçoit les données de séries chronologiques, les moyennes pour le nombre de fois spécifié, la fréquence, la partie réelle^2, partie imaginaire^2, partie réelle^2+Partie imaginaire^2, renvoie le nombre moyen de fois.
[Paramètres d'entrée]
allinputarray :Tableau unidimensionnel de données chronologiques en FFT
dt :Intervalle de temps d'entrée des données dans tous les tableaux d'entrée(sec)
filtname=None :Avec ou sans filtre. la valeur par défaut est Aucun. Seul le hanning est pris en charge.
pltflag=False :Indique s'il faut tracer les données de séries temporelles à utiliser par FFT. la valeur par défaut est False
outname = "default" :Un nom pour distinguer le fichier de sortie. la valeur par défaut est"default"。
anum = 10 :Le nombre moyen de fois. la valeur par défaut est 10 fois.
length = 'default':La longueur des données utilisées pour une FFT. la valeur par défaut est"default"Dans ce cas, longueur=La longueur du tableau d'entrée/Calculé par anum.
debug = False :Drapeau pour le débogage.
[Paramètres de sortie]
la fréquence(Hz), Partie réelle^2, partie imaginaire^2, Partie réelle^2+Partie imaginaire^2, nombre moyen de fois
"""
alllen = len(allinputarray) #Obtenez la longueur du tableau
print("..... length of all array = ", alllen)
if length == 'default': #Obtenez automatiquement la longueur d'une FFT.
print("..... averaging is done with the number of the cycle. = ", cnum)
length = int (alllen / anum)
print("..... length = ", length)
else: #Obtenez la longueur d'une FFT à partir de la longueur.
print("..... averaging is done with the length of the record = ", length)
print("..... the number of average = ", anum)
averagenum = 0 #Une variable pour stocker le nombre moyen de fois. Si les données sont parfaitement remplies, elles correspondent à anum.
#Calculer le nombre de séquences après FFT
if (length%2==0):
num_of_fftoutput = int(length/2) + 1 #S'il est pair, divisez par 2 et ajoutez 1.
else:
print("[ERROR] Please choose even number for length ", length)
sys.exit() #Si la longueur est impaire, elle n'est normalement pas utilisée, elle génère donc une erreur et se termine.
#Obtenez un tableau vide. Préparez un tableau à partir du tableau à deux dimensions à ajouter en tant que tableau à deux dimensions avec numpy.
reals = np.empty((0, num_of_fftoutput), float)
imags = np.empty((0, num_of_fftoutput), float)
psds = np.empty((0, num_of_fftoutput), float)
unwrap_phases = np.empty((0, num_of_fftoutput), float)
for num in np.arange( 0, anum, 1): #Effectuez une FFT un nombre moyen de fois.
onerecord = allinputarray[num * length : (num + 1 ) * length]
oneoutname = outname + str(num)
if debug : print("..... ..... num = ", num, " len(onerecord) = ", len(onerecord))
freq, spreal, spimag, sppower, spunwrap_phases = scipy_fft(onerecord - np.mean(onerecord), dt, filtname=filtname, pltflag=pltflag, outname = oneoutname, debug = debug)
reals = np.append(reals, [spreal], axis=0) # add power^2/Hz
imags = np.append(imags, [spimag], axis=0) # add power^2/Hz
psds = np.append(psds, [sppower], axis=0) # add power^2/Hz
unwrap_phases = np.append(unwrap_phases, [spunwrap_phases], axis=0) # add radians
averagenum = averagenum + 1
real2 = np.mean(reals, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
imag2 = np.mean(imags, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
psd2 = np.mean(psds, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
phase = np.mean(unwrap_phases, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
return(freq,real2,imag2,psd2,averagenum,phase)
def scipy_fft(inputarray,dt,filtname=None, pltflag=False, outname = "default", debug = False):
# updated, 2015/01/25, freq is adjusted to PSP output.
#Exécutez FFT.
bin = len(inputarray) #Obtenez la longueur des données de séries chronologiques. 1024(default)
#Paramètres de filtre
if filtname == None:
filt = np.ones(len(inputarray))
cornorm = 1.0
elif filtname == 'hanning':
filt = sp.hanning(len(inputarray))
# print "filt =" + str(filt)
cornorm = 8./3. #Un terme qui corrige la puissance atténuée par le filtre Hanning.
else:
print('No support for filtname=%s' % filtname)
exit()
###########################################################
# freq = np.arange(0, nyquist, nyquist/adclength)
# This means freq = [12.2, .. 6250.], len(freq) = 512
# because PSP discard 0 freq component.
# freq = sf.fftfreq(bin,dt)[0:bin/2 +1]
# scipy fftfreq return [0, 12.2 .... , -6250, ...], Nyquist is negative.
nyquist = 0.5/dt
adclength = int(0.5 * bin) # 512 (default)
# freq = np.linspace(0, nyquist, adclength + 1)[1:] # this is only for PSP
freq = np.linspace(0, nyquist, adclength + 1)[0:]
############################################################
df = freq[1]-freq[0]
# fft = sf.fft(inputarray*filt,bin)[1:bin/2+1] # this is only for PSP
fft = sf.fft(inputarray*filt,bin)[0:int(bin/2+1)]
# scipy fftfreq return [0, 12.2 .... , -6250, ...], Nyquist is negative.
# so [1:bin/2 + 1] = [1:513] = [12.2 ,,, -6250]
###########################################################################
real = fft.real
imag = fft.imag
# psd = np.abs(fft)
############ this is used for to adjust PSP unit.
# psd2 = np.abs(fft) * np.abs(fft) * cornorm
############ this is used for to adjust matplotlib norm = rms definition.
# psd = np.abs(fft)/np.sqrt(df)/(bin/np.sqrt(2.))
fftnorm = (np.sqrt(df) * bin) / np.sqrt(2.) #Le terme requis pour que l'intégration de fréquence devienne RMS.
psd = np.abs(fft) * np.sqrt(cornorm) / fftnorm
psd2 = psd * psd # Power^2
rms_from_ft = np.sqrt(np.sum(psd2) * df) #Montant intégré dans l'espace de fréquences. Correspond au RMS spatio-temporel.
phase = np.arctan2(np.array(real),np.array(imag)) # radians, * 180. / np.pi # dig
phase_unwrap = np.unwrap(phase)
if pltflag: #Tracez les données de la série chronologique pour confirmation.
binarray = np.arange(0,bin,1) * dt
F = plt.figure(figsize=(10,8.))
plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(3,1,1)
plt.title("check scipy_fft")
plt.xscale('linear')
plt.grid(True)
plt.xlabel(r'Time (s)')
plt.ylabel('FFT input')
plt.errorbar(binarray, inputarray, fmt='r', label="Raw data")
fnorm = np.abs(np.amax(inputarray))
plt.errorbar(binarray, fnorm * filt, fmt='b--', label="Filter")
plt.errorbar(binarray, inputarray * filt, fmt='g', label="Filtered Raw data")
plt.legend(numpoints=1, frameon=False, loc=1)
ax = plt.subplot(3,1,2)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel(r'Power ($rms/\sqrt{Hz}$)')
plt.errorbar(freq, psd, fmt='r', label="PSD")
plt.legend(numpoints=1, frameon=False, loc=1)
ax = plt.subplot(3,1,3)
plt.xscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel('UnWrap Phase (radians)')
plt.errorbar(freq, phase_unwrap, fmt='r', label="phase")
plt.legend(numpoints=1, frameon=False, loc=1)
# plt.show()
outputfiguredir = "scipy_fft_check"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + "scipy_fftcheck" + "_t" + str(dt).replace(".","p") + "_" + outname + ".png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
if debug:
print("=====> please make sure the three RMSs below are almost same. ")
print(". . . . [check RMS] std(input), np.std(input * filt), rms_from_ft = " + str(np.std(inputarray)) + " " + str(np.std(inputarray * filt)) +" " + str(rms_from_ft))
return(freq,real,imag,psd2,phase_unwrap)
class lcfile():
"""
Une classe pour une courbe de lumière.
"""
def __init__ (self, eventfile, debug, hlen = -1, clen = 4, anum = 10, length = 'default', pltflag = False, tcol = 0, vcol = 1, vecol = 2):
"""Lisez le contenu du fichier avec la fonction d'initialisation init de cette classe."""
eventfile = eventfile.strip() #Exclut le code de saut de ligne.
self.eventfilename = eventfile
self.ext = eventfile.split(".")[-1]
self.filenametag = os.path.basename(self.eventfilename).replace(".txt","").replace(".text","").replace(".csv","").replace(".qdp","").replace(".lc","").replace(".fits","")
self.filenametag = self.filenametag + "_l" + str(length) + "_a" + str(anum)
self.debug = debug
#extract header information
if os.path.isfile(eventfile):
self.file = open(eventfile)
if self.ext == "lc" or self.ext == "fits":
print(".... file is considered as FITS format ", self.ext)
from astropy.io import fits
hdu = fits.open(eventfile)
self.t = hdu[1].data["TIME"]
self.v = hdu[1].data["RATE"]
self.ve = hdu[1].data["ERROR"]
else:
print(".... file is considered as text format ", self.ext)
self.t = np.array([]) #temps
self.v = np.array([]) #Valeur, e.g., count/sec
self.ve = np.array([]) #Erreur
for i, oneline in enumerate(self.file):
if i > (hlen - 1) : # skip header
list = oneline.split()
if list[0][0] is not "!": # skip a line starting with !
if len(list) == clen: # check column number of the input file
self.t = np.append(self.t, float(list[tcol]))
self.v = np.append(self.v, float(list[vcol]))
if vecol > 0:
self.ve = np.append(self.ve, float(list[vecol]))
else:
self.ve = np.append(self.ve, float(0.)) # padding 0.
if debug : print(len(self.t), len(self.v), len(self.ve))
if len(self.t) < 1:
print("[ERROR] no data are stored, ", self.t)
print("Please check the length of column: column of your data = ", len(list), " input param of clen = ", clen)
print("Both should be the same.")
sys.exit()
self.tstart = self.t[0]
self.tstop = self.t[-1]
self.timebin = self.t[1] - self.t[0]
# created dt = t[i+1] - t[i], and add 0 to make it have the same array length as t.
self.dt = self.t[1:] - self.t[0:-1]
self.dt_minus_timebin = self.dt - self.timebin #Jugez le saut du temps. S'il n'y a pas de vol, il sera nul.
np.append(self.dt_minus_timebin,0)
print("timebin (sec) = ", self.timebin)
print("start (sec) = ", self.tstart)
print("stop (sec) = ", self.tstop)
print("expected maxinum number of bins = ", int((self.tstop - self.tstart)/self.timebin))
else:
print("ERROR : file does not exist. ", eventfile)
def plot_lc_entire(self):
"""
plot entire lightcurves
"""
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Time (sec)')
plt.ylabel(r'Count Rate ($cs^{-1}$)')
plt.errorbar(self.t, self.v, self.ve, fmt='ro-', label=self.filenametag, alpha = 0.9)
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_lc"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_lc_entire.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def create_good_lc(self, anum = 5, length = 512, debug = False):
"""
S'il n'y a aucune donnée manquante pendant la longueur, elle est considérée comme bonne.
S'il y a un saut dans les données, les données jusqu'à ce point sont supprimées et le saut dans les données
Dès la fin, collectez de nouvelles longueurs de données.
"""
self.good_lc_t = np.array([]) #bon temps jugé
self.good_lc_v = np.array([]) #bonne valeur jugée
self.good_lc_ve = np.array([]) #bonne erreur jugée
tmp_t = []
tmp_v = []
tmp_ve = []
num_of_good_interval = 0 #Le nombre d'intervalles jugé bon.
num_of_discarded_events = 0 #Le nombre d'événements qui n'ont pas été jugés bons.
for i, (t, v, ve, tdiff) in enumerate(zip(self.t, self.v, self.ve, self.dt_minus_timebin)):
if np.abs(tdiff) > 0.5 * self.timebin: # 0.5 is not important. very small number in tdiff means time is contineous.
"""Détecte les sauts dans les données. Jeter les données collectées jusqu'à présent."""
print(".-.-.-.-.-. time jump has occurred. ", i, t, v, tdiff)
num_of_discarded_events = num_of_discarded_events + len(tmp_t)
# initialize buffer arrays
tmp_t = []
tmp_v = []
tmp_ve = []
else:
tmp_t.append(t)
tmp_v.append(v)
tmp_ve.append(ve)
if len(tmp_t) == length:
"""Comme il n'y a pas de saut dans les données et que la longueur est accumulée, enregistrez-la dans un tableau."""
# store data
self.good_lc_t = np.append(self.good_lc_t, tmp_t)
self.good_lc_v = np.append(self.good_lc_v, tmp_v)
self.good_lc_ve = np.append(self.good_lc_ve, tmp_ve)
num_of_good_interval = num_of_good_interval + 1
# initialize buffer arrays
tmp_t = []
tmp_v = []
tmp_ve = []
# print status
print("def create_good_lc() ")
print("num_of_good_interval (event x length) = ", num_of_good_interval)
print("num_of_discarded_events (event) = ", num_of_discarded_events, "\n")
if debug:
print("self.good_lc_t", len(self.good_lc_t))
print("self.good_lc_v", len(self.good_lc_v))
print("self.good_lc_ve", len(self.good_lc_ve))
if num_of_good_interval < anum:
print("[ERROR] number of good interval", num_of_good_interval, " is smaller than one average number ", anum )
sys.exit()
def calc_fft(self, filtname=None, pltflag = False, anum = 5, length = 512, dccut = True, debug = False):
print("start : in calc_fft()")
#Effectuer des temps anum FFT pour une série chronologique de longueur de longueur et prendre la moyenne.
#Un tableau unidimensionnel pour que le tampon stocke le tableau requis pour la moyenne.
tmp_tlist = np.array([])
tmp_vlist = np.array([])
num_of_averaged_fft = 0 #Comptez le nombre de moyennes réussies
#Calculer le nombre de séquences après FFT(Après la coupure DC)
if (length%2==0):
num_of_fftoutput = int(length/2) #S'il est pair, divisez par 2.
else:
print("[ERROR] Please choose even number for length ", length)
sys.exit() #Si la longueur est impaire, elle n'est normalement pas utilisée, elle génère donc une erreur et se termine.
#Obtenez un tableau vide. Préparez un tableau à partir du tableau à deux dimensions à ajouter en tant que tableau à deux dimensions avec numpy.
self.freq_ave = np.empty((0, num_of_fftoutput), float)
self.real_ave = np.empty((0, num_of_fftoutput), float)
self.imag_ave = np.empty((0, num_of_fftoutput), float)
self.power_ave = np.empty((0, num_of_fftoutput), float)
self.phase_ave = np.empty((0, num_of_fftoutput), float)
self.power_avenum = np.array([])
self.time_ave = np.array([])
for i, (tlist, vlist) in enumerate(zip(self.good_lc_t, self.good_lc_v)):
#Ajoutez le tableau par longueur.
tmp_tlist = np.append(tmp_tlist, tlist)
tmp_vlist = np.append(tmp_vlist, vlist)
#Si la longueur est égale ou supérieure à 1 et que le nombre d'éléments est divisible par le nombre moyen de fois x longueur
if len(tmp_tlist) > 0 and (len(tmp_tlist)%(anum*length) == 0):
#Calculez FFT.
# freq_fréquence moyenne(Hz) real_ave (Le carré de la partie réelle), imag_ave(Le carré de la partie imaginaire), power_ave(Absolu au carré), avenum(Nombre moyen de fois)
freq_ave, real_ave, imag_ave, power_ave, avenum, phase_ave = \
average_scipy_fft( tmp_vlist, self.timebin, filtname=filtname, \
pltflag=pltflag, outname = self.filenametag, anum = anum, length = length, debug = debug)
if dccut: #Normalement, le composant DC n'est pas nécessaire, alors coupez-le.
freq_ave = freq_ave[1:]
real_ave = real_ave[1:]
imag_ave = imag_ave[1:]
power_ave = power_ave[1:]
phase_ave = phase_ave[1:]
num_of_averaged_fft = num_of_averaged_fft + 1 #Ajouter le nombre moyen de fois
self.time_ave = np.append(self.time_ave, 0.5 * (tmp_tlist[0] + tmp_tlist[-1]) ) #Calculer le centre du temps
self.power_avenum = np.append(self.power_avenum, avenum) #Économisez un nombre moyen de fois
self.freq_ave = np.append(self.freq_ave, [freq_ave], axis=0)
self.real_ave = np.append(self.real_ave, [real_ave], axis=0)
self.imag_ave = np.append(self.imag_ave, [imag_ave], axis=0)
self.power_ave = np.append(self.power_ave, [power_ave], axis=0)
self.phase_ave = np.append(self.phase_ave, [phase_ave], axis=0)
# init buffer arrays
tmp_tlist = np.array([])
tmp_vlist = np.array([])
else:
pass #Il n'y a pas assez d'éléments pour faire la moyenne.
# print status
print("num_of_averaged_fft = ", num_of_averaged_fft)
print("finish : in calc_fft()")
def plot_fft(self):
"""
plot fft
"""
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('log')
plt.yscale('log')
plt.grid(True)
plt.xlabel(r'Frequency(Hz)')
plt.ylabel(r'Power ($rms/\sqrt{Hz}$)')
for fre, power, time in zip(self.freq_ave, self.power_ave, self.time_ave):
plt.errorbar(fre, np.sqrt(power), fmt='-', label=" t = " + str(time) + "" , alpha = 0.8) #la puissance est le carré de la puissance et s'affiche en empruntant l'itinéraire.
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_fft"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_fft.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def plot_phase(self):
"""
plot phase
"""
F = plt.figure(figsize=(12,8.))
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel(r'unwarp phase (radians)')
for fre, phase, time in zip(self.freq_ave, self.phase_ave, self.time_ave):
plt.errorbar(fre, phase, fmt='-', label=" t = " + str(time) + "" , alpha = 0.8) #la puissance est le carré de la puissance et s'affiche en empruntant l'itinéraire.
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_phase"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_phase.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def dump_text_fft(self):
"""
dump fft data into text file
"""
odir = "text_fft"
subprocess.getoutput('mkdir -p ' + odir)
for i, (fre, power, time) in enumerate(zip(self.freq_ave, self.power_ave, self.time_ave)):
outfile = odir + "/" + self.filenametag + "_fft_" + str("%06d" % i) + "_" + str(time) + ".txt"
fout = open(outfile, "w")
for f, p in zip(fre,power):
outstr=str(f) + " " + str(p) + "\n" #Fréquence, puissance au carré
fout.write(outstr)
fout.close()
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def main():
"""Lorsque vous exécutez le script, il commence ici."""
curdir = os.getcwd() #Obtenez le répertoire actuel
"""Options de réglage"""
#paramètres optparse
usage = u'%prog fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]'
version = __version__
parser = optparse.OptionParser(usage=usage, version=version)
#Paramètres des options.--Le caractère après est la variable d'option.
parser.add_option('-d', '--debug', action='store_true', help='The flag to show detailed information (Flag)', metavar='DEBUG', default=False)
parser.add_option('-p', '--pltflag', action='store_true', help='to plot time-series data', metavar='PLTFLAG', default=False)
parser.add_option('-m', '--hlen', action='store', type='int', help='Header length', metavar='HLEN', default=-1)
parser.add_option('-c', '--clen', action='store', type='int', help='Column length', metavar='CLEN', default=4)
parser.add_option('-a', '--anum', action='store', type='int', help='Average number', metavar='ANUM', default=12)
parser.add_option('-l', '--length', action='store', type='int', help='FFT bin number (e.g., 1024, 2048, ..)', metavar='LENGTH', default=256)
# for setting column number for time, value, and error of value
parser.add_option('-t', '--tcol', action='store', type='int', help='column number for time', metavar='TCOL', default=0)
parser.add_option('-s', '--vcol', action='store', type='int', help='column number for value', metavar='VCOL', default=1)
parser.add_option('-e', '--vecol', action='store', type='int', help='column number for value error (if negative, it skips error column)', metavar='VECOL', default=2)
options, args = parser.parse_args() #Obtenir des arguments et des options
print("=========================================================================")
print("Start " + __file__ + " " + version )
print("=========================================================================")
debug = options.debug
pltflag = options.pltflag
hlen = options.hlen
clen = options.clen
anum = options.anum
length = options.length
# for setting column number for time, value, and error of value
tcol = options.tcol
vcol = options.vcol
vecol = options.vecol;
print(" (OPTIONS)")
print("-- debug ", debug)
print("-- pltflag ", pltflag)
print("-- hlen (header number) ", hlen)
print("-- clen (column number) ", clen)
print("-- anum (average number) ", anum)
print("-- length (FFT bin number) ", length)
# for setting column number for time, value, and error of value
print("-- column number for time ", tcol)
print("-- column number for value ", vcol)
print("-- column number for value error ", vecol, " (if negative, it skips.)")
print("do FFT for one length of ", length, " by averaging ", anum, " times" )
print("=========================================================================" )
#Si l'argument n'est pas 1, il lancera une erreur et quittera.
argc = len(args)
if (argc < 1):
print('| STATUS : ERROR ')
print(parser.print_help())
quit()
listname = args[0] # get a file name which contains file names
filelistfile=open(listname, 'r')
"""Créer une liste de fichiers"""
filelist = []
for i, filename in enumerate(filelistfile):
print("\n...... reading a file (", i, ") ", filename )
filelist.append(filename)
"""Créer une liste de classe"""
evelist = []
for i, filename in enumerate(filelist):
print("\n...... creating a class for a file (", i, ") ", filename)
eve = lcfile(filename, debug, hlen = hlen, clen = clen, anum = anum, length = length, pltflag = pltflag,\
tcol = tcol, vcol = vcol, vecol = vecol)
evelist.append(eve)
"""Tracez toute la courbe de lumière des données d'entrée.(Vous pouvez l'ignorer car c'est pour confirmation.) """
for i, eve in enumerate(evelist):
print("...... creating an entire lightcurve (", i, ") ", eve.eventfilename)
eve.plot_lc_entire()
""" Create good interval.Une longueur(intervalle xronos)Collectez uniquement les données satisfaites."""
for i, eve in enumerate(evelist):
print("\n...... creating good interval (", i, ") ", eve.eventfilename)
eve.create_good_lc(anum = anum, length = length, debug = debug)
""" Create calc fft.Exécutez FFE."""
for i, eve in enumerate(evelist):
print("\n...... calculating fft (", i, ") ", eve.eventfilename)
eve.calc_fft(filtname=None, pltflag = pltflag, anum = anum, length = length, debug=debug)
""" Create plot fft.Tracez le spectre de puissance."""
for i, eve in enumerate(evelist):
print("\n...... calculating fft (", i, ") ", eve.eventfilename)
eve.plot_fft()
""" Create plot unwrap phase"""
for i, eve in enumerate(evelist):
print("\n...... calculating phase (", i, ") ", eve.eventfilename)
eve.plot_phase()
""" Dump FFT results into text files .Dump dans un fichier texte."""
for i, eve in enumerate(evelist):
print("\n...... text dump of fft (", i, ") ", eve.eventfilename)
eve.dump_text_fft()
print("=================== Finish ==========================")
if __name__ == '__main__':
main()
Ana_Timing_plot2d_FFT.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" Ana_Timing_plot2d_FFT.py is to plot lightcurves.
This is a python script for timing analysis.
History:
2016-09-13 ; ver 1.0; first draft from scratch
2020-04-24 ; ver 1.2; updated for python3
"""
__author__ = 'Shinya Yamada (syamada(at)tmu.ac.jp'
__version__= '1.2'
############# scipy ############################################################
import scipy as sp # scipy :Un module incontournable pour les opérations numériques
import scipy.fftpack as sf #Module FFT. Utilise la bibliothèque FFT de fortan.
################################################################################
############# numpy ############################################################
import numpy as np # numpy :Un module indispensable pour le calcul de tableaux
from numpy import linalg as LA
################################################################################
############# matplotlib #######################################################
import matplotlib.pylab as plab
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, FixedLocator
from mpl_toolkits.axes_grid import AxesGrid
# For 3D plot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
#Redimensionnement des étiquettes
params = {'xtick.labelsize': 13, # x ticks
'ytick.labelsize': 13, # y ticks
'legend.fontsize': 10
}
plt.rcParams['font.family'] = 'serif' #Spécifiez la police.(serif est dans tous les environnements.)
plt.rcParams.update(params)
from matplotlib.colors import LogNorm
#################################################################################
import random #Module de génération de nombres aléatoires(no use)
import array # (no use)
import os #Modules liés au système d'exploitation
import sys #modules liés au système
import math #Module de fonction mathématique
#import commands #Module d'utilisation des commandes shell pour python2
import subprocess #Module d'utilisation des commandes shell pour python3
# import pyfits #adapte le module
import optparse #Relation d'option
#import yaml
import re #Prise en charge des expressions régulières
import csv #fichier csv
import datetime #Prise en charge du temps
import pytz # timezoze
# jst = pytz.timezone('Asia/Tokyo')
# PyROOT
#import ROOT
#ROOT.gROOT.SetBatch(1)
class fftfile():
"""
Une classe pour une FFT.
"""
def __init__ (self, eventfile, debug, hlen = -1, clen = 2):
"""Lisez le contenu du fichier avec la fonction d'initialisation init de cette classe."""
eventfile = eventfile.strip() #Exclut le code de saut de ligne.
self.eventfilename = eventfile
self.filenametag_full = eventfile.replace(".txt","")
self.filenametag = os.path.basename(self.filenametag_full).replace(".txt","").replace(".text","").replace(".csv","").replace(".qdp","")
self.filenametag = self.filenametag
self.debug = debug
#extract header information
if os.path.isfile(eventfile):
self.file = open(eventfile)
self.f = np.array([]) #la fréquence(Hz)
self.v = np.array([]) #Puissance au carré(rms^2/Hz)
for i, oneline in enumerate(self.file):
if i > hlen: # skip header
list = oneline.split()
if list[0][0] is not "!": # skip a line starting with !
if len(list) == clen: # check column number of the input file
self.f = np.append(self.f, float(list[0]))
self.v = np.append(self.v, float(list[1]))
if debug : print(len(self.f), len(self.v))
self.df = self.f[1] - self.f[0]
self.rms_from_ft = np.sqrt(np.sum(self.v) * self.df) #Montant intégré dans l'espace de fréquences. Correspond au RMS spatio-temporel.
self.nbin = len(self.f)
print("Maximum frequency (Hz) = ", self.f[-1] )
print("frequency bin (Hz) = ", self.df)
print("Number of bins = ", self.nbin)
print("RMS over all frequency = ", self.rms_from_ft)
self.file.close()
else:
print("ERROR : file does not exist. ", eventfile)
exit
def plot_2d_FFT(evelist, outname = "defaultoutname", pltflag = False):
"""
plot 2d plot
Prenez une liste de classes fftfile comme arguments et tracez les résultats combinés.
"""
power2d = []
for eve in evelist:
power2d.append(eve.v)
power2d = np.array(power2d)
x = range(len(evelist)) #Créer un tableau unidimensionnel dans le sens du temps
y = evelist[0].f #Tableau unidimensionnel de fréquences
X, Y = np.meshgrid(x,y) #Effectuez un tableau bidimensionnel comme un tracé bidimensionnel.(spécifications propriétaires de matplotlib)
Z = power2d.T #Une fois transposé, l'axe horizontal devient l'axe du temps.
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title("Time vs. Frequency")
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Number of FFT in order of time')
plt.ylabel(r'Frequency ($Hz$)')
ax.set_xlim(x[0],x[-1])
ax.set_ylim(y[0],y[-1])
if (X.shape == Y.shape) and (X.shape == Z.shape) : #Le tracé bidimensionnel de matplotlib doit avoir le même nombre d'éléments.
plt.pcolormesh( X, Y, Z, norm=LogNorm(vmin=Z.min(), vmax=Z.max()))
cl = plt.colorbar()
cl.set_label(r"Power ($rms^2/Hz$)")
# plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fft2d_fig"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + outname + "_log.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
if pltflag:
plt.show()
else:
print("\n[ERROR] Please make sure all the following shape are the same.")
print("The same shape is necessary for matplotlib 2D plot.")
print(X.shape, Y.shape, Z.shape)
def main():
"""Lorsque vous exécutez le script, il commence ici."""
curdir = os.getcwd() #Obtenez le répertoire actuel
"""Options de réglage"""
#paramètres optparse
usage = u'%prog fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]'
version = __version__
parser = optparse.OptionParser(usage=usage, version=version)
#Paramètres des options.--Le caractère après est la variable d'option.
parser.add_option('-d', '--debug', action='store_true', help='The flag to show detailed information (Flag)', metavar='DEBUG', default=False)
parser.add_option('-p', '--pltflag', action='store_true', help='to plot time-series data', metavar='PLTFLAG', default=False)
parser.add_option('-e', '--hlen', action='store', type='int', help='Header length', metavar='HLEN', default=-1)
parser.add_option('-c', '--clen', action='store', type='int', help='Column length', metavar='CLEN', default=2)
parser.add_option('-o', '--outname', action='store', type='string', help='Output file name', metavar='OUTNAME', default='fft2dplot')
options, args = parser.parse_args() #Obtenir des arguments et des options
print("=========================================================================")
print("Start " + __file__ + " " + version )
print("=========================================================================")
debug = options.debug
pltflag = options.pltflag
hlen = options.hlen
clen = options.clen
outname = options.outname
print(" (OPTIONS)")
print("-- debug ", debug)
print("-- pltflag ", pltflag)
print("-- hlen (header number) ", hlen)
print("-- clen (column number) ", clen)
print("-- outname ", outname)
print("=========================================================================" )
#Si l'argument n'est pas 1, il lancera une erreur et quittera.
argc = len(args)
if (argc < 1):
print('| STATUS : ERROR ')
print(parser.print_help())
quit()
listname = args[0] # get a file name which contains file names
listnametag = listname.replace(".list","")
outname = outname + "_" + listnametag
filelistfile=open(listname, 'r')
"""Créer une liste de fichiers"""
filelist = []
for i, filename in enumerate(filelistfile):
print("\n...... reading a file (", i, ") ", filename )
filelist.append(filename)
"""Créer une liste de classe"""
evelist = []
for i, filename in enumerate(filelist):
print("\n...... creating a class for a file (", i, ") ", filename)
eve = fftfile(filename, debug, hlen = hlen, clen = clen)
evelist.append(eve)
""" Time vs.Faites un tracé bidimensionnel de FFT. Remarque)Pour être précis, l'axe horizontal est le numéro du fichier qui a été FFT dans l'ordre chronologique, pas l'heure."""
print("\n...... creating 2D FFT ")
plot_2d_FFT(evelist, outname = outname, pltflag = pltflag)
print("=================== Finish ==========================")
if __name__ == '__main__':
main()