Here, we deal with time-variable analysis using python, especially time-series data (called a light curve) of the intensity of black hole photons observed by X-rays. If only the file format is prepared, it can be used for analysis of any time series data. You don't have to code it yourself, but you can do most of it with NASA's tool called XRONOS. However, it is for those people because it is more studying to write it by yourself for details and new time fluctuation analysis. The operating environment is anaconda python3 series, which is on the mac terminal. It is assumed that ls can be used. linux is okay, but windows may need some tweaking. (Updated for python3 on 2020.04.24 with version = 1.2, and included unwrap phase calculation with 2020-05-12 ver 1.3.)
How to download the sample file and code
-Download set --Code --Creating a one-dimensional spectrum and light curve Ana_Timing_do_FFT.py --Generate a two-dimensional power spectrum Ana_Timing_plot2d_FFT.py
The normalization factor (norm) of the power spectrum (PSD) is integrated at all frequencies. Please standardize to RMS ^ 2. I think this is the most used. If you want to check See How to confirm the Persival theorem using the Fourier transform (FFT) of matplotlib and scipy.
How to cut out the time to apply the power spectrum once
--Length of one FFT T (sec) (Frequency resolution is 1 / T (Hz)) --Time bin dT (sec) (Nyquist frequency becomes 1 / dT * 2 [Hz]) --Average number of times N (fluctuation becomes smaller with sqrt (N))
Consider the three parameters of, and use the optimum one.
For example, if the brightness is about several c / s,
--One FFT length T = 256sec to 8096sec --Time bin dT = 1 sec to 64 sec --Average number of times N = 1 to 5 times
I think that is a reasonable level. It is estimated that the statistical fluctuation is about several% to 10% with several hundred X-rays per FFT on the order. This depends on the target science and physics.
The file (xis3_src_0p1sec_6000.qdp) is a part of the light curve of the black hole. The fits file is converted to text with flc2ascii and only 6000 lines are extracted.
python
$cat xis3_src_0p1sec_6000.qdp
7.05204864025116e+02 3.20020450000000e+01 1.60010220000000e+01 1.00000010000000e+00
7.05329855978489e+02 8.00051120000000e+00 8.00051120000000e+00 1.00000010000000e+00
7.05454847991467e+02 8.00051120000000e+00 8.00051120000000e+00 1.00000010000000e+00
....
Like Time (sec) Count rate (c / s) Count rate error (c / s) Fractional exposure (percentage of data jammed per bin) is clogged, 4 lines x hours long Make it a file. Not limited to this, prepare simple code to improve debug efficiency with a small amount of data.
python
/bin/ls xis3_src_0p1sec_6000.qdp > test1lc.list
Prepare a file with the file name written on it. This is similar to the use of fools to pass a file with a file name when you want to process multiple files at once.
Execute Ana_Timing_do_FFT.py.
python
python Ana_Timing_do_FFT.py test1lc.list -a 10 -l 128
here, I used it as "python Ana_Timing_do_FFT.py test1lc.list -a average number of times -l number of bins used for one FFT".
There are other options.
python
$python Ana_Timing_do_FFT.py -h
Usage: Ana_Timing_do_FFT.py fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-d, --debug The flag to show detailed information (Flag)
-p, --pltflag to plot time-series data
-m HLEN, --hlen=HLEN Header length
-c CLEN, --clen=CLEN Column length
-a ANUM, --anum=ANUM Average number
-l LENGTH, --length=LENGTH
FFT bin number (e.g., 1024, 2048, ..)
-t TCOL, --tcol=TCOL column number for time
-s VCOL, --vcol=VCOL column number for value
-e VECOL, --vecol=VECOL
column number for value error (if negative, it skips
error column)
If -p is added in the options, the figure of the result of each FFT will be saved. If you want to see the FFT before averaging, execute it with the -p option.
If it runs and works,
fig_fft/Power spectrum plot
fig_phase/Unwrap phase of power spectrum
fig_lc/Light curve plot
text_fft/Text dump of power spectrum(Time vs. Power^2 [rms^2/Hz])
Directory is generated and the figure is generated.
The generated one-dimensional power spectrum is stored in text_fft, and its file name is put in test1fft.list. Ana_Timing_plot2d_FFT.py generates a two-dimensional power spectrum with the file list as an argument.
python
$ /bin/ls text_fft/xis3_src_0p1sec_6000_l128_a10_fft_00000* > test1fft.list
$ python Ana_Timing_plot2d_FFT.py test1fft.list
Then, a two-dimensional histogram is generated.
Next, the data of a black hole star called J1550 acquired by a satellite with a large effective area called RXTE [j1550_lowF_50ms_forqiita.txt](http://www-x.phys.se.tmu.ac.jp/%7Esyamada/qiita Let's try using (/v1_python3/j1550_lowF_50ms_forqiita.txt) with an example where the quasi-periodic variation (QPO) of a black hole can be clearly seen.
python
/bin/ls j1550_lowF_50ms_forqiita.txt > rxtelc.list
python Ana_Timing_do_FFT.py rxtelc.list -a 10 -l 512 -d
/bin/ls text_fft/j1550_lowF_50ms_forqiita_l512_a10_fft_0000* > rxtefft.list
python Ana_Timing_plot2d_FFT.py rxtefft.list
--Light curve
--Power spectrum
--Unwrapped phase spectrum
--Two-dimensional power spectrum
What you can see around this ~ 0.5 Hz is the quasi-periodic fluctuation of the black hole.
--Results for each FFT
With the -p option, the result of each FFT is plotted and put in a directory called scipy_fft_check.
python
python Ana_Timing_do_FFT.py rxtelc.list -a 10 -l 512 -d -p
If you use the command convert to convert to a gif animation,
python
convert -delay 50 -loop 5 *.png j1550_fftcheck.gif
As shown, the time series, PSD, and phase per one interval are plotted.
If the extension is .lc or .fits, it will be recognized as FITS format and the file will be automatically read by astropy.
python
/bin/ls ae905005010hxd_0_pinno_cl_pha25-160.lc > xislc.list
python Ana_Timing_do_FFT.py xislc.list -a 400 -l 64
/bin/ls text_fft/ae905005010hxd_0_pinno_cl_pha25-160_l64_a400_fft_0000* > xisfft.list
python Ana_Timing_plot2d_FFT.py xisfft.list
--Light curve
The observation time is about 70ks, but in reality, there is no effective observation time about half of that due to the backside of the earth and the limitation of the sun angle. This is the situation for most geocentric satellites. In this program, if there is not enough data to perform one FFT, it will be skipped. In xronos, a setting file called a window file is used, and it is specified that only the time zone when multiple conditions are cleared is used for calculation.
--Power spectrum
The black hole QPO is visible around ~ 3Hz. It is often weak like this, and it is necessary to devise statistics and analysis in order to obtain parameters accurately.
All you need is
--Creating a one-dimensional spectrum and light curve Ana_Timing_do_FFT.py --Generate a two-dimensional power spectrum Ana_Timing_plot2d_FFT.py
Only two of are OK. It corresponds to python3 system. If you enter fits, you need astropy.
Even if you don't have a python environment or have trouble building an environment, you can use google Colab as long as you have a google account. How to use google Colab
-Online learning using google Colab for those who start space research
Please refer to.
First, let's put the set of files on google drive. So let's mount it so that google colab can access the files on google drive.
from google import colab
colab.drive.mount('/content/gdrive')
Get a temporary password and copy it. Then
You can browse the files placed on google drive from google Colab like. Next, let's copy the set of files to google drive.
python
!cp -rf /content/gdrive/My\ Drive/python related/202004/v1_python3 .
You have now copied the set of files to google Colab. Experts can skip it, but be sure to check where you are and see the files properly.
Use cd to move, pwd to check your current location, and ls to display a list of files in your current location.
Also, it should work as described in this article. (I'm using python3 series and I should be using only basic modules. Please report if it doesn't work.)
/ bin / ls and ls should be the same. However, depending on the environment, there is a setting to output information other than the file name with ls, so here we write to call / bin / ls without options.
Ana_Timing_do_FFT.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" Ana_Timing_do_FFT.py is to plot lightcurves.
This is a python script for timing analysis.
History:
2016-09-13 ; ver 1.0; first draft from scratch
2020-04-24 ; ver 1.2; updated for python3
2020-05-12 ; ver 1.3; unwrap phase included
"""
__author__ = 'Shinya Yamada (syamada(at)tmu.ac.jp'
__version__= '1.2'
############# scipy ############################################################
import scipy as sp # scipy :A must-have module for numerical operations
import scipy.fftpack as sf #FFT module. Uses fortan's FFT library.
################################################################################
############# numpy ############################################################
import numpy as np # numpy :A must-have module for array calculation
from numpy import linalg as LA
################################################################################
############# matplotlib #######################################################
import matplotlib.pylab as plab
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, FixedLocator
# For 3D plot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
#Label size adjustment
params = {'xtick.labelsize': 13, # x ticks
'ytick.labelsize': 13, # y ticks
'legend.fontsize': 10
}
plt.rcParams['font.family'] = 'serif' #Specify the font.(serif is in every environment.)
plt.rcParams.update(params)
#################################################################################
import random #Random number generator module(no use)
import array # (no use)
import os #OS related modules
import sys #system related modules
import math #Mathematical function module
#import commands #Module for using shell commands for python2
import subprocess #Module for using shell commands for python3
# import pyfits #fits module
import optparse #Option relation
#import yaml
import re #Regular expression support
import csv #csv file
import datetime #Time support
import pytz # timezoze
# jst = pytz.timezone('Asia/Tokyo')
# PyROOT
#import ROOT
#ROOT.gROOT.SetBatch(1)
def average_scipy_fft( allinputarray, dt, filtname=None, pltflag=False, outname = "default", anum = 10, length = 'default', debug = False):
"""
Receives time series data, averages the specified number of times, frequency, real part^2, imaginary part^2, real part^2+Imaginary part^2, return the average number of times.
[Input parameters]
allinputarray :One-dimensional array of time series data to be FFT
dt :Time interval of data input to allinputarray(sec)
filtname=None :With or without filter. default is None. Only hanning is supported.
pltflag=False :Flag for whether to plot time series data to be FFTed. default is False
outname = "default" :A name to distinguish the output file. default is"default"。
anum = 10 :The average number of times. default is 10 times.
length = 'default':The length of data used for one FFT. default is"default"And in that case, length=Input array length/Calculated by anum.
debug = False :Flag for debugging.
[Output parameters]
frequency(Hz), Real part^2, imaginary part^2, Real part^2+Imaginary part^2, average number of times
"""
alllen = len(allinputarray) #Get the length of the array
print("..... length of all array = ", alllen)
if length == 'default': #Obtain the length of one FFT automatically.
print("..... averaging is done with the number of the cycle. = ", cnum)
length = int (alllen / anum)
print("..... length = ", length)
else: #Get the length of one FFT from length.
print("..... averaging is done with the length of the record = ", length)
print("..... the number of average = ", anum)
averagenum = 0 #A variable to store the average number of times. If the data is perfectly filled, it matches anum.
#Calculate the number of sequences after FFT
if (length%2==0):
num_of_fftoutput = int(length/2) + 1 #If it is an even number, divide by 2 and add 1.
else:
print("[ERROR] Please choose even number for length ", length)
sys.exit() #If the length is odd, it is not normally used, so it throws an error and ends.
#Get an empty array. Prepare an array from the two-dimensional array to append as a two-dimensional array with numpy.
reals = np.empty((0, num_of_fftoutput), float)
imags = np.empty((0, num_of_fftoutput), float)
psds = np.empty((0, num_of_fftoutput), float)
unwrap_phases = np.empty((0, num_of_fftoutput), float)
for num in np.arange( 0, anum, 1): #Perform the FFT an average number of anum times.
onerecord = allinputarray[num * length : (num + 1 ) * length]
oneoutname = outname + str(num)
if debug : print("..... ..... num = ", num, " len(onerecord) = ", len(onerecord))
freq, spreal, spimag, sppower, spunwrap_phases = scipy_fft(onerecord - np.mean(onerecord), dt, filtname=filtname, pltflag=pltflag, outname = oneoutname, debug = debug)
reals = np.append(reals, [spreal], axis=0) # add power^2/Hz
imags = np.append(imags, [spimag], axis=0) # add power^2/Hz
psds = np.append(psds, [sppower], axis=0) # add power^2/Hz
unwrap_phases = np.append(unwrap_phases, [spunwrap_phases], axis=0) # add radians
averagenum = averagenum + 1
real2 = np.mean(reals, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
imag2 = np.mean(imags, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
psd2 = np.mean(psds, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
phase = np.mean(unwrap_phases, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
return(freq,real2,imag2,psd2,averagenum,phase)
def scipy_fft(inputarray,dt,filtname=None, pltflag=False, outname = "default", debug = False):
# updated, 2015/01/25, freq is adjusted to PSP output.
#Perform an FFT.
bin = len(inputarray) #Get the length of time series data. 1024(default)
#Filter settings
if filtname == None:
filt = np.ones(len(inputarray))
cornorm = 1.0
elif filtname == 'hanning':
filt = sp.hanning(len(inputarray))
# print "filt =" + str(filt)
cornorm = 8./3. #A term that corrects the power attenuated by the Hanning filter.
else:
print('No support for filtname=%s' % filtname)
exit()
###########################################################
# freq = np.arange(0, nyquist, nyquist/adclength)
# This means freq = [12.2, .. 6250.], len(freq) = 512
# because PSP discard 0 freq component.
# freq = sf.fftfreq(bin,dt)[0:bin/2 +1]
# scipy fftfreq return [0, 12.2 .... , -6250, ...], Nyquist is negative.
nyquist = 0.5/dt
adclength = int(0.5 * bin) # 512 (default)
# freq = np.linspace(0, nyquist, adclength + 1)[1:] # this is only for PSP
freq = np.linspace(0, nyquist, adclength + 1)[0:]
############################################################
df = freq[1]-freq[0]
# fft = sf.fft(inputarray*filt,bin)[1:bin/2+1] # this is only for PSP
fft = sf.fft(inputarray*filt,bin)[0:int(bin/2+1)]
# scipy fftfreq return [0, 12.2 .... , -6250, ...], Nyquist is negative.
# so [1:bin/2 + 1] = [1:513] = [12.2 ,,, -6250]
###########################################################################
real = fft.real
imag = fft.imag
# psd = np.abs(fft)
############ this is used for to adjust PSP unit.
# psd2 = np.abs(fft) * np.abs(fft) * cornorm
############ this is used for to adjust matplotlib norm = rms definition.
# psd = np.abs(fft)/np.sqrt(df)/(bin/np.sqrt(2.))
fftnorm = (np.sqrt(df) * bin) / np.sqrt(2.) #The term required for the frequency integral to be RMS.
psd = np.abs(fft) * np.sqrt(cornorm) / fftnorm
psd2 = psd * psd # Power^2
rms_from_ft = np.sqrt(np.sum(psd2) * df) #Amount integrated in frequency space. Matches spatiotemporal RMS.
phase = np.arctan2(np.array(real),np.array(imag)) # radians, * 180. / np.pi # dig
phase_unwrap = np.unwrap(phase)
if pltflag: #Plot the time series data for confirmation.
binarray = np.arange(0,bin,1) * dt
F = plt.figure(figsize=(10,8.))
plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(3,1,1)
plt.title("check scipy_fft")
plt.xscale('linear')
plt.grid(True)
plt.xlabel(r'Time (s)')
plt.ylabel('FFT input')
plt.errorbar(binarray, inputarray, fmt='r', label="Raw data")
fnorm = np.abs(np.amax(inputarray))
plt.errorbar(binarray, fnorm * filt, fmt='b--', label="Filter")
plt.errorbar(binarray, inputarray * filt, fmt='g', label="Filtered Raw data")
plt.legend(numpoints=1, frameon=False, loc=1)
ax = plt.subplot(3,1,2)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel(r'Power ($rms/\sqrt{Hz}$)')
plt.errorbar(freq, psd, fmt='r', label="PSD")
plt.legend(numpoints=1, frameon=False, loc=1)
ax = plt.subplot(3,1,3)
plt.xscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel('UnWrap Phase (radians)')
plt.errorbar(freq, phase_unwrap, fmt='r', label="phase")
plt.legend(numpoints=1, frameon=False, loc=1)
# plt.show()
outputfiguredir = "scipy_fft_check"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + "scipy_fftcheck" + "_t" + str(dt).replace(".","p") + "_" + outname + ".png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
if debug:
print("=====> please make sure the three RMSs below are almost same. ")
print(". . . . [check RMS] std(input), np.std(input * filt), rms_from_ft = " + str(np.std(inputarray)) + " " + str(np.std(inputarray * filt)) +" " + str(rms_from_ft))
return(freq,real,imag,psd2,phase_unwrap)
class lcfile():
"""
One class for one light curve.
"""
def __init__ (self, eventfile, debug, hlen = -1, clen = 4, anum = 10, length = 'default', pltflag = False, tcol = 0, vcol = 1, vecol = 2):
"""Read the contents of the file with the initialization function init of this class."""
eventfile = eventfile.strip() #Excludes line feed code.
self.eventfilename = eventfile
self.ext = eventfile.split(".")[-1]
self.filenametag = os.path.basename(self.eventfilename).replace(".txt","").replace(".text","").replace(".csv","").replace(".qdp","").replace(".lc","").replace(".fits","")
self.filenametag = self.filenametag + "_l" + str(length) + "_a" + str(anum)
self.debug = debug
#extract header information
if os.path.isfile(eventfile):
self.file = open(eventfile)
if self.ext == "lc" or self.ext == "fits":
print(".... file is considered as FITS format ", self.ext)
from astropy.io import fits
hdu = fits.open(eventfile)
self.t = hdu[1].data["TIME"]
self.v = hdu[1].data["RATE"]
self.ve = hdu[1].data["ERROR"]
else:
print(".... file is considered as text format ", self.ext)
self.t = np.array([]) #time
self.v = np.array([]) #Value, e.g., count/sec
self.ve = np.array([]) #error
for i, oneline in enumerate(self.file):
if i > (hlen - 1) : # skip header
list = oneline.split()
if list[0][0] is not "!": # skip a line starting with !
if len(list) == clen: # check column number of the input file
self.t = np.append(self.t, float(list[tcol]))
self.v = np.append(self.v, float(list[vcol]))
if vecol > 0:
self.ve = np.append(self.ve, float(list[vecol]))
else:
self.ve = np.append(self.ve, float(0.)) # padding 0.
if debug : print(len(self.t), len(self.v), len(self.ve))
if len(self.t) < 1:
print("[ERROR] no data are stored, ", self.t)
print("Please check the length of column: column of your data = ", len(list), " input param of clen = ", clen)
print("Both should be the same.")
sys.exit()
self.tstart = self.t[0]
self.tstop = self.t[-1]
self.timebin = self.t[1] - self.t[0]
# created dt = t[i+1] - t[i], and add 0 to make it have the same array length as t.
self.dt = self.t[1:] - self.t[0:-1]
self.dt_minus_timebin = self.dt - self.timebin #Judge the jump of time. If there is no flight, it will be zero.
np.append(self.dt_minus_timebin,0)
print("timebin (sec) = ", self.timebin)
print("start (sec) = ", self.tstart)
print("stop (sec) = ", self.tstop)
print("expected maxinum number of bins = ", int((self.tstop - self.tstart)/self.timebin))
else:
print("ERROR : file does not exist. ", eventfile)
def plot_lc_entire(self):
"""
plot entire lightcurves
"""
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Time (sec)')
plt.ylabel(r'Count Rate ($cs^{-1}$)')
plt.errorbar(self.t, self.v, self.ve, fmt='ro-', label=self.filenametag, alpha = 0.9)
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_lc"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_lc_entire.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def create_good_lc(self, anum = 5, length = 512, debug = False):
"""
If there is no data missing during length, it is judged as good.
If there is a jump in the data, the data up to that point is discarded and the jump in the data
From the end, collect new length data.
"""
self.good_lc_t = np.array([]) #good Judged time
self.good_lc_v = np.array([]) #good Judged value
self.good_lc_ve = np.array([]) #good Judged error
tmp_t = []
tmp_v = []
tmp_ve = []
num_of_good_interval = 0 #The number of intervals determined to be good.
num_of_discarded_events = 0 #The number of events that were not judged to be good.
for i, (t, v, ve, tdiff) in enumerate(zip(self.t, self.v, self.ve, self.dt_minus_timebin)):
if np.abs(tdiff) > 0.5 * self.timebin: # 0.5 is not important. very small number in tdiff means time is contineous.
"""Detects jumps in the data. Discard the data collected so far."""
print(".-.-.-.-.-. time jump has occurred. ", i, t, v, tdiff)
num_of_discarded_events = num_of_discarded_events + len(tmp_t)
# initialize buffer arrays
tmp_t = []
tmp_v = []
tmp_ve = []
else:
tmp_t.append(t)
tmp_v.append(v)
tmp_ve.append(ve)
if len(tmp_t) == length:
"""Since there is no jump in the data and the length is accumulated, save it in an array."""
# store data
self.good_lc_t = np.append(self.good_lc_t, tmp_t)
self.good_lc_v = np.append(self.good_lc_v, tmp_v)
self.good_lc_ve = np.append(self.good_lc_ve, tmp_ve)
num_of_good_interval = num_of_good_interval + 1
# initialize buffer arrays
tmp_t = []
tmp_v = []
tmp_ve = []
# print status
print("def create_good_lc() ")
print("num_of_good_interval (event x length) = ", num_of_good_interval)
print("num_of_discarded_events (event) = ", num_of_discarded_events, "\n")
if debug:
print("self.good_lc_t", len(self.good_lc_t))
print("self.good_lc_v", len(self.good_lc_v))
print("self.good_lc_ve", len(self.good_lc_ve))
if num_of_good_interval < anum:
print("[ERROR] number of good interval", num_of_good_interval, " is smaller than one average number ", anum )
sys.exit()
def calc_fft(self, filtname=None, pltflag = False, anum = 5, length = 512, dccut = True, debug = False):
print("start : in calc_fft()")
#Perform FFT anum times for a time series of lehgth length and take the average.
#A one-dimensional array for a buffer to store the array required for averaging.
tmp_tlist = np.array([])
tmp_vlist = np.array([])
num_of_averaged_fft = 0 #Count the number of successful averaging
#Calculate the number of sequences after FFT(After DC cut)
if (length%2==0):
num_of_fftoutput = int(length/2) #If it is an even number, divide by 2.
else:
print("[ERROR] Please choose even number for length ", length)
sys.exit() #If the length is odd, it is not normally used, so it throws an error and ends.
#Get an empty array. Prepare an array from the two-dimensional array to append as a two-dimensional array with numpy.
self.freq_ave = np.empty((0, num_of_fftoutput), float)
self.real_ave = np.empty((0, num_of_fftoutput), float)
self.imag_ave = np.empty((0, num_of_fftoutput), float)
self.power_ave = np.empty((0, num_of_fftoutput), float)
self.phase_ave = np.empty((0, num_of_fftoutput), float)
self.power_avenum = np.array([])
self.time_ave = np.array([])
for i, (tlist, vlist) in enumerate(zip(self.good_lc_t, self.good_lc_v)):
#Add the array by length.
tmp_tlist = np.append(tmp_tlist, tlist)
tmp_vlist = np.append(tmp_vlist, vlist)
#If the length is 1 or more and the number of elements is divisible by the average number of times x length
if len(tmp_tlist) > 0 and (len(tmp_tlist)%(anum*length) == 0):
#Calculate the FFT.
# freq_ave frequency(Hz) real_ave (The square of the real part), imag_ave(The square of the imaginary part), power_ave(Absolute value squared), avenum(Average number of times)
freq_ave, real_ave, imag_ave, power_ave, avenum, phase_ave = \
average_scipy_fft( tmp_vlist, self.timebin, filtname=filtname, \
pltflag=pltflag, outname = self.filenametag, anum = anum, length = length, debug = debug)
if dccut: #Normally, the DC component is unnecessary, so cut it.
freq_ave = freq_ave[1:]
real_ave = real_ave[1:]
imag_ave = imag_ave[1:]
power_ave = power_ave[1:]
phase_ave = phase_ave[1:]
num_of_averaged_fft = num_of_averaged_fft + 1 #Add the average number of times
self.time_ave = np.append(self.time_ave, 0.5 * (tmp_tlist[0] + tmp_tlist[-1]) ) #Calculate the center of time
self.power_avenum = np.append(self.power_avenum, avenum) #Save average number of times
self.freq_ave = np.append(self.freq_ave, [freq_ave], axis=0)
self.real_ave = np.append(self.real_ave, [real_ave], axis=0)
self.imag_ave = np.append(self.imag_ave, [imag_ave], axis=0)
self.power_ave = np.append(self.power_ave, [power_ave], axis=0)
self.phase_ave = np.append(self.phase_ave, [phase_ave], axis=0)
# init buffer arrays
tmp_tlist = np.array([])
tmp_vlist = np.array([])
else:
pass #There are not enough elements to average.
# print status
print("num_of_averaged_fft = ", num_of_averaged_fft)
print("finish : in calc_fft()")
def plot_fft(self):
"""
plot fft
"""
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('log')
plt.yscale('log')
plt.grid(True)
plt.xlabel(r'Frequency(Hz)')
plt.ylabel(r'Power ($rms/\sqrt{Hz}$)')
for fre, power, time in zip(self.freq_ave, self.power_ave, self.time_ave):
plt.errorbar(fre, np.sqrt(power), fmt='-', label=" t = " + str(time) + "" , alpha = 0.8) #power is the square of the power and is displayed by taking the route.
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_fft"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_fft.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def plot_phase(self):
"""
plot phase
"""
F = plt.figure(figsize=(12,8.))
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel(r'unwarp phase (radians)')
for fre, phase, time in zip(self.freq_ave, self.phase_ave, self.time_ave):
plt.errorbar(fre, phase, fmt='-', label=" t = " + str(time) + "" , alpha = 0.8) #power is the square of the power and is displayed by taking the route.
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_phase"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_phase.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def dump_text_fft(self):
"""
dump fft data into text file
"""
odir = "text_fft"
subprocess.getoutput('mkdir -p ' + odir)
for i, (fre, power, time) in enumerate(zip(self.freq_ave, self.power_ave, self.time_ave)):
outfile = odir + "/" + self.filenametag + "_fft_" + str("%06d" % i) + "_" + str(time) + ".txt"
fout = open(outfile, "w")
for f, p in zip(fre,power):
outstr=str(f) + " " + str(p) + "\n" #Frequency, power squared
fout.write(outstr)
fout.close()
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def main():
"""When you run the script, it starts here."""
curdir = os.getcwd() #Get the current directory
"""Setting options"""
#optparse settings
usage = u'%prog fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]'
version = __version__
parser = optparse.OptionParser(usage=usage, version=version)
#Option settings.--The character after is the option variable.
parser.add_option('-d', '--debug', action='store_true', help='The flag to show detailed information (Flag)', metavar='DEBUG', default=False)
parser.add_option('-p', '--pltflag', action='store_true', help='to plot time-series data', metavar='PLTFLAG', default=False)
parser.add_option('-m', '--hlen', action='store', type='int', help='Header length', metavar='HLEN', default=-1)
parser.add_option('-c', '--clen', action='store', type='int', help='Column length', metavar='CLEN', default=4)
parser.add_option('-a', '--anum', action='store', type='int', help='Average number', metavar='ANUM', default=12)
parser.add_option('-l', '--length', action='store', type='int', help='FFT bin number (e.g., 1024, 2048, ..)', metavar='LENGTH', default=256)
# for setting column number for time, value, and error of value
parser.add_option('-t', '--tcol', action='store', type='int', help='column number for time', metavar='TCOL', default=0)
parser.add_option('-s', '--vcol', action='store', type='int', help='column number for value', metavar='VCOL', default=1)
parser.add_option('-e', '--vecol', action='store', type='int', help='column number for value error (if negative, it skips error column)', metavar='VECOL', default=2)
options, args = parser.parse_args() #Get arguments and options
print("=========================================================================")
print("Start " + __file__ + " " + version )
print("=========================================================================")
debug = options.debug
pltflag = options.pltflag
hlen = options.hlen
clen = options.clen
anum = options.anum
length = options.length
# for setting column number for time, value, and error of value
tcol = options.tcol
vcol = options.vcol
vecol = options.vecol;
print(" (OPTIONS)")
print("-- debug ", debug)
print("-- pltflag ", pltflag)
print("-- hlen (header number) ", hlen)
print("-- clen (column number) ", clen)
print("-- anum (average number) ", anum)
print("-- length (FFT bin number) ", length)
# for setting column number for time, value, and error of value
print("-- column number for time ", tcol)
print("-- column number for value ", vcol)
print("-- column number for value error ", vecol, " (if negative, it skips.)")
print("do FFT for one length of ", length, " by averaging ", anum, " times" )
print("=========================================================================" )
#If the argument is not 1, it throws an error and exits.
argc = len(args)
if (argc < 1):
print('| STATUS : ERROR ')
print(parser.print_help())
quit()
listname = args[0] # get a file name which contains file names
filelistfile=open(listname, 'r')
"""Create a file list"""
filelist = []
for i, filename in enumerate(filelistfile):
print("\n...... reading a file (", i, ") ", filename )
filelist.append(filename)
"""Create a class list"""
evelist = []
for i, filename in enumerate(filelist):
print("\n...... creating a class for a file (", i, ") ", filename)
eve = lcfile(filename, debug, hlen = hlen, clen = clen, anum = anum, length = length, pltflag = pltflag,\
tcol = tcol, vcol = vcol, vecol = vecol)
evelist.append(eve)
"""Plot the entire light curve of the input data.(You can skip it because it is for confirmation.) """
for i, eve in enumerate(evelist):
print("...... creating an entire lightcurve (", i, ") ", eve.eventfilename)
eve.plot_lc_entire()
""" Create good interval.One length(xronos interval)Collect only the data that is satisfied."""
for i, eve in enumerate(evelist):
print("\n...... creating good interval (", i, ") ", eve.eventfilename)
eve.create_good_lc(anum = anum, length = length, debug = debug)
""" Create calc fft.Execute FFE."""
for i, eve in enumerate(evelist):
print("\n...... calculating fft (", i, ") ", eve.eventfilename)
eve.calc_fft(filtname=None, pltflag = pltflag, anum = anum, length = length, debug=debug)
""" Create plot fft.Plot the power spectrum."""
for i, eve in enumerate(evelist):
print("\n...... calculating fft (", i, ") ", eve.eventfilename)
eve.plot_fft()
""" Create plot unwrap phase"""
for i, eve in enumerate(evelist):
print("\n...... calculating phase (", i, ") ", eve.eventfilename)
eve.plot_phase()
""" Dump FFT results into text files .Dump to a text file."""
for i, eve in enumerate(evelist):
print("\n...... text dump of fft (", i, ") ", eve.eventfilename)
eve.dump_text_fft()
print("=================== Finish ==========================")
if __name__ == '__main__':
main()
Ana_Timing_plot2d_FFT.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" Ana_Timing_plot2d_FFT.py is to plot lightcurves.
This is a python script for timing analysis.
History:
2016-09-13 ; ver 1.0; first draft from scratch
2020-04-24 ; ver 1.2; updated for python3
"""
__author__ = 'Shinya Yamada (syamada(at)tmu.ac.jp'
__version__= '1.2'
############# scipy ############################################################
import scipy as sp # scipy :A must-have module for numerical operations
import scipy.fftpack as sf #FFT module. Uses fortan's FFT library.
################################################################################
############# numpy ############################################################
import numpy as np # numpy :A must-have module for array calculation
from numpy import linalg as LA
################################################################################
############# matplotlib #######################################################
import matplotlib.pylab as plab
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, FixedLocator
from mpl_toolkits.axes_grid import AxesGrid
# For 3D plot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
#Label size adjustment
params = {'xtick.labelsize': 13, # x ticks
'ytick.labelsize': 13, # y ticks
'legend.fontsize': 10
}
plt.rcParams['font.family'] = 'serif' #Specify the font.(serif is in every environment.)
plt.rcParams.update(params)
from matplotlib.colors import LogNorm
#################################################################################
import random #Random number generator module(no use)
import array # (no use)
import os #OS related modules
import sys #system related modules
import math #Mathematical function module
#import commands #Module for using shell commands for python2
import subprocess #Module for using shell commands for python3
# import pyfits #fits module
import optparse #Option relation
#import yaml
import re #Regular expression support
import csv #csv file
import datetime #Time support
import pytz # timezoze
# jst = pytz.timezone('Asia/Tokyo')
# PyROOT
#import ROOT
#ROOT.gROOT.SetBatch(1)
class fftfile():
"""
One class for one FFT.
"""
def __init__ (self, eventfile, debug, hlen = -1, clen = 2):
"""Read the contents of the file with the initialization function init of this class."""
eventfile = eventfile.strip() #Excludes line feed code.
self.eventfilename = eventfile
self.filenametag_full = eventfile.replace(".txt","")
self.filenametag = os.path.basename(self.filenametag_full).replace(".txt","").replace(".text","").replace(".csv","").replace(".qdp","")
self.filenametag = self.filenametag
self.debug = debug
#extract header information
if os.path.isfile(eventfile):
self.file = open(eventfile)
self.f = np.array([]) #frequency(Hz)
self.v = np.array([]) #Power squared(rms^2/Hz)
for i, oneline in enumerate(self.file):
if i > hlen: # skip header
list = oneline.split()
if list[0][0] is not "!": # skip a line starting with !
if len(list) == clen: # check column number of the input file
self.f = np.append(self.f, float(list[0]))
self.v = np.append(self.v, float(list[1]))
if debug : print(len(self.f), len(self.v))
self.df = self.f[1] - self.f[0]
self.rms_from_ft = np.sqrt(np.sum(self.v) * self.df) #Amount integrated in frequency space. Matches spatiotemporal RMS.
self.nbin = len(self.f)
print("Maximum frequency (Hz) = ", self.f[-1] )
print("frequency bin (Hz) = ", self.df)
print("Number of bins = ", self.nbin)
print("RMS over all frequency = ", self.rms_from_ft)
self.file.close()
else:
print("ERROR : file does not exist. ", eventfile)
exit
def plot_2d_FFT(evelist, outname = "defaultoutname", pltflag = False):
"""
plot 2d plot
Take a list of fftfile classes as arguments and plot the combined results.
"""
power2d = []
for eve in evelist:
power2d.append(eve.v)
power2d = np.array(power2d)
x = range(len(evelist)) #Create a one-dimensional array in the time direction
y = evelist[0].f #One-dimensional array of frequencies
X, Y = np.meshgrid(x,y) #Perform a two-dimensional array like a two-dimensional plot.(original specifications of matplotlib)
Z = power2d.T #When transposed, the horizontal axis becomes the time axis.
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title("Time vs. Frequency")
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Number of FFT in order of time')
plt.ylabel(r'Frequency ($Hz$)')
ax.set_xlim(x[0],x[-1])
ax.set_ylim(y[0],y[-1])
if (X.shape == Y.shape) and (X.shape == Z.shape) : #Two-dimensional plots of matplotlib must have the same number of elements.
plt.pcolormesh( X, Y, Z, norm=LogNorm(vmin=Z.min(), vmax=Z.max()))
cl = plt.colorbar()
cl.set_label(r"Power ($rms^2/Hz$)")
# plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fft2d_fig"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + outname + "_log.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
if pltflag:
plt.show()
else:
print("\n[ERROR] Please make sure all the following shape are the same.")
print("The same shape is necessary for matplotlib 2D plot.")
print(X.shape, Y.shape, Z.shape)
def main():
"""When you run the script, it starts here."""
curdir = os.getcwd() #Get the current directory
"""Setting options"""
#optparse settings
usage = u'%prog fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]'
version = __version__
parser = optparse.OptionParser(usage=usage, version=version)
#Option settings.--The character after is the option variable.
parser.add_option('-d', '--debug', action='store_true', help='The flag to show detailed information (Flag)', metavar='DEBUG', default=False)
parser.add_option('-p', '--pltflag', action='store_true', help='to plot time-series data', metavar='PLTFLAG', default=False)
parser.add_option('-e', '--hlen', action='store', type='int', help='Header length', metavar='HLEN', default=-1)
parser.add_option('-c', '--clen', action='store', type='int', help='Column length', metavar='CLEN', default=2)
parser.add_option('-o', '--outname', action='store', type='string', help='Output file name', metavar='OUTNAME', default='fft2dplot')
options, args = parser.parse_args() #Get arguments and options
print("=========================================================================")
print("Start " + __file__ + " " + version )
print("=========================================================================")
debug = options.debug
pltflag = options.pltflag
hlen = options.hlen
clen = options.clen
outname = options.outname
print(" (OPTIONS)")
print("-- debug ", debug)
print("-- pltflag ", pltflag)
print("-- hlen (header number) ", hlen)
print("-- clen (column number) ", clen)
print("-- outname ", outname)
print("=========================================================================" )
#If the argument is not 1, it throws an error and exits.
argc = len(args)
if (argc < 1):
print('| STATUS : ERROR ')
print(parser.print_help())
quit()
listname = args[0] # get a file name which contains file names
listnametag = listname.replace(".list","")
outname = outname + "_" + listnametag
filelistfile=open(listname, 'r')
"""Create a file list"""
filelist = []
for i, filename in enumerate(filelistfile):
print("\n...... reading a file (", i, ") ", filename )
filelist.append(filename)
"""Create a class list"""
evelist = []
for i, filename in enumerate(filelist):
print("\n...... creating a class for a file (", i, ") ", filename)
eve = fftfile(filename, debug, hlen = hlen, clen = clen)
evelist.append(eve)
""" Time vs.Make a 2D plot of the FFT. note)To be precise, the horizontal axis is the number of the FFT file in chronological order, not the time."""
print("\n...... creating 2D FFT ")
plot_2d_FFT(evelist, outname = outname, pltflag = pltflag)
print("=================== Finish ==========================")
if __name__ == '__main__':
main()
Recommended Posts