For implied volatility and the Black-Scholes equation, Black–Scholes model - Wikipedia
About reading the data you are using From reading Pandas of information such as Nikkei 225 option theoretical price to shaping \ -night flight
checking
--Calculation of implied volatility from option price --Comparison of calculation speed (more accurately, comparison of execution time)
[Volatility -Wikipedia](https://ja.wikipedia.org/wiki/%E3%83%9C%E3%83%A9%E3%83%86%E3%82%A3%E3%83%AA% E3% 83% 86% E3% 82% A3 # .E3.82.A4.E3.83.B3.E3.83.97.E3.83.A9.E3.82.A4.E3.83.89.E3.83.BB .E3.83.9C.E3.83.A9.E3.83.86.E3.82.A3.E3.83.AA.E3.83.86.E3.82.A3)
Equations for $ \ sigma $ (note that $ C (K, T) $ is a function of $ \ sigma $) Market price $ = C (K, T) $ The $ \ sigma $ obtained by solving> is called implied volatility.
As you can see, C (K, T) is calculated to find σ, which is the market price, but since C (K, T) cannot be solved simply, Root-finding algorithm .org / wiki /% E6% B1% 82% E6% A0% B9% E3% 82% A2% E3% 83% AB% E3% 82% B4% E3% 83% AA% E3% 82% BA% E3% The implied volatility is calculated by using 83% A0).
One of them is Bisection method -Wikipedia.
-[x] Optimization calculation using Scipy (numerical solution) / pandas.apply --Single Process -[x] Optimization calculation using Scipy (numerical solution) / ndarray Scalar function optimization x loop: Function vectorization --Single Process -[x] Optimization calculation using Scipy (numerical solution) / ndarray Vector function optimization: Function vectorization --Single Process -[x] ndarray optimization Cython / ndarray vector function optimization-Single Process -[x] ndarray optimization Cython / ndarray scalar function optimization vector-Single Process -[x] Optimization calculation using Scipy (numerical solution) / pandas.apply + Multi Process -[x] Learning the inverse function using ANN ⇒ Use as an inference engine (Keras) -[x] Using Swig (C ++ implementation) / Single loop --Single Process
The one implemented in C ++ was overwhelmingly faster than the various ingenuity in Python, and it seems that it was Cython's effort. Moreover, C ++ still has some spare capacity such as thread parallelization, this time ...!
Python's multi-process goes to read the Keras library every time in the case of Windwos, so this is bad, but even the individually executed version without this took about 35 seconds.
The inverse function using ANN is fast but inaccurate. I think there are limits to the design of NN. (Maybe it's impossible unless you do something like WGAN / StackGAN)
item | Data Handling | Optimize | BS_Vectorized | Proc | Time[sec] |
---|---|---|---|---|---|
Optimization calculation using Scipy(Numerical solution) / pandas.apply - Single Process | Pd.apply | minimize_scalar(Scipy) | No | Single | 2.8061 |
Optimization calculation using Scipy(Numerical solution) /ndarray scalar function optimization x loop:Function vectorization- Single Process | Pd->np | minimize_scalar(Scipy) | Yes | Single | 3.2068 |
Optimization calculation using Scipy(Numerical solution) /ndarray vector function optimization:Function vectorization- Single Process | Pd->np | root(Scipy) | Yes | Single | 0.6706 |
ndarray optimization cython/ndarray vector function optimization- Single Process | Pd->np(CyPy) | root(Scipy) | Yes | Single | 0.6860 |
ndarray optimization cython/ndarray scalar function optimization vector- Single Process | Pd->np(CyPy) | VectBisection(Cython) | Yes | Single | 0.4848 |
Optimization calculation using Scipy(Numerical solution) / pandas.apply + Multi Process | Pd->Split(Pdx8)->np(CyPy) | VectBisection(Cython) | Yes | MultiProc | 128.3856 |
Learning the inverse function using ANN ⇒ Use as an inference engine(Keras) | Pd->np->ANN | N/A | Yes | Single | 0.1526 |
Using Swig(C++Implementation)/Single loop- Single Process | Pd->np->C++(Swig) | Bisection(C++) | No | Single | 0.0010 |
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from keras.models import load_model
from datetime import datetime
import dateutil
import scipy.optimize
from scipy.stats import norm
#Cython version
from BS_Cy import *
#Swig version
from _BS import *
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
maturity = 19.75/245.
def strip(text):
try:
return text.strip()
except AttributeError:
return text
def BS_norm(F225, Strike, maturity, vol, call):
volSQM = vol * (maturity ** .5)
d1 = np.log(F225/Strike) / volSQM + volSQM/2
d2 = d1 - volSQM
return (F225 * norm.cdf(d1) - Strike*norm.cdf(d2)) if call else \
(Strike * norm.cdf(-d2) - F225*norm.cdf(-d1))
def BS_norm_vect(F225, Strike, maturity, vol, call):
volSQM = vol * (maturity ** .5)
d1 = np.log(F225/Strike) / volSQM + volSQM/2
d2 = d1 - volSQM
Call = F225 * norm.cdf(d1) - Strike*norm.cdf(d2)
Put = Strike * norm.cdf(-d2) - F225*norm.cdf(-d1)
premium = Call * call + Put * np.logical_not(call)
return premium
def myapply(x):
res = scipy.optimize.minimize_scalar(
lambda vol:
(
BS_norm(x['F225_PRICE'], x['STRIKE'], maturity, vol, x['CALL'])
- x['OP_PRICE']
)**2,
method='Bounded', bounds =(0.01, 0.5),
options={'maxiter': 50, 'xatol': 1e-04})
x['vol1'] = res.x
return x
def myapply_vect(F225, Strike, price, call):
x = []
for i in range(len(F225)):
res = scipy.optimize.minimize_scalar(
lambda vol:
( BS_norm_vect(F225[i], Strike[i], maturity, vol,call[i]) - price[i] )**2,
method='Bounded', bounds =(0.01, 0.5),
options={'maxiter': 50, 'xatol': 1e-04})
x.append(res.x)
return x
def myapply_vect2(F225, Strike, price, call):
res = scipy.optimize.root(
lambda vol:( BS_norm_vect(F225, Strike, maturity, vol,call) - price),
np.ones_like(F225)*.3)
return res.x
def pworker(df):
df['vol6'] = cy_apply2(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
return df
if __name__ == '__main__':
# #Data preparation
# [Information such as option theoretical price\|Japan Exchange Group](http://www.jpx.co.jp/markets/derivatives/option-price/01.html)February 10, 2017 closing price data obtained from.
#There are various things, but the underlying asset for the March 2017 contract: Only the option transaction data of Nikkei 225 is used.
#
# ##Read data into Pandas
# 1.Raise the header yourself and name it based on the header information distributed separately from the data body
# 2.Trim whitespace in text data during loading
#Header name(By variable name)
#Product code,Product type,Contract month,Strike price,Reserve
#Put options:Stock code,closing price,Reserve,Theoretical price,Volatility
#Call options:Stock code,closing price,Reserve,Theoretical price,Volatility
#Underlying asset closing price,Criteria volatility
colName = ("CODE","TYPE","MATURITY","STRIKE", "RSV",
"PUT_CODE", "PUT_PRICE", "PUT_RSV", "PUT_TPRICE", "PUT_VOLATILITY",
"CALL_CODE","CALL_PRICE","CALL_RSV","CALL_TPRICE","CALL_VOLATILITY",
"F225_PRICE", "Base_VOL")
df = pd.read_csv('./ose20170210tp.csv',names=colName,
converters = {'CODE' : strip,
'TYPE' : strip})
#Only the Nikkei 225 options for the March 2017 contract are extracted. By the way, I deleted unnecessary columns and it was refreshing.
df = df.query("MATURITY == 201703 & CODE==\"NK225E\"") .drop(['RSV','PUT_RSV','CALL_RSV','PUT_CODE','CALL_CODE','CODE','TYPE','MATURITY'], 1)
# *Since PUT and CALL are separated, normalize the data.
# *When the CALL column is TRUE, it is CALL data, and when it is FALSE, it is PUT data.
# *Exercise price is also narrowed down to 14000 yen or more and less than 22000 yen
df_p = df[["STRIKE","PUT_PRICE","PUT_TPRICE", "PUT_VOLATILITY","F225_PRICE", "Base_VOL"]] .rename(columns={'PUT_PRICE': 'OP_PRICE', 'PUT_TPRICE':'OP_TPRICE', 'PUT_VOLATILITY':'OP_VOL'})
df_p['CALL'] = False
df_c = df[["STRIKE","CALL_PRICE","CALL_TPRICE", "CALL_VOLATILITY","F225_PRICE", "Base_VOL"]] .rename(columns={'CALL_PRICE': 'OP_PRICE', 'CALL_TPRICE':'OP_TPRICE', 'CALL_VOLATILITY':'OP_VOL'})
df_c['CALL'] = True
df = df_p.append(df_c).query("OP_PRICE > 1.0 & STRIKE < 22000 & STRIKE >= 14000")
del (df_p,df_c)
tmp_df = df
loop_num = 10
text = 'Time elapsed: %.2f seconds'
result_time = []
result_Col = np.array([["Data Handling","Optimize","BS_Vectorized","Proc","Time[sec]"]])
result_con = np.array([["Pd.apply",
"Pd->np",
"Pd->np",
"Pd->np(CyPy)",
"Pd->np(CyPy)",
"Pd->Split(Pd x 8)->np(CyPy)",
"Pd->np->ANN",
"Pd->np->C++(Swig)"
]])
result_opt = np.array([["minimize_scalar(Scipy)",
"minimize_scalar(Scipy)",
"root(Scipy)",
"root(Scipy)",
"Vect Bisection(Cython)",
"Vect Bisection(Cython)",
"N/A",
"Bisection(C++)"
]])
result_Vect = np.array([["No",
"Yes",
"Yes",
"Yes",
"Yes",
"Yes",
"Yes",
"No"
]])
result_Proc = np.array([["Single",
"Single",
"Single",
"Single",
"Single",
"Multi Proc",
"Single",
"Single"
]])
# 1.Optimization calculation using Scipy(Numerical solution) / pandas.apply - Single Process
time_start = time.time()
for i in range(loop_num):
tmp_df = df.apply(myapply, axis=1)
result_time.append((time.time() - time_start))
# 2.Optimization calculation using Scipy(Numerical solution) /ndarray loop:Function vectorization- Single Process
time_start = time.time()
for i in range(loop_num):
tmp_df['vol2'] = myapply_vect(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 3.Optimization calculation using Scipy(Numerical solution) /ndarray vector:Function vectorization- Single Process
time_start = time.time()
for i in range(loop_num):
tmp_df['vol3'] = myapply_vect2(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 4. Cython - Root
time_start = time.time()
for i in range(loop_num):
tmp_df['vol4'] = cy_apply1(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 5. Cython - My Bisection
time_start = time.time()
for i in range(loop_num):
tmp_df['vol5'] = cy_apply2(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 6. Multi Process
time_start = time.time()
for i in range(loop_num):
p = mp.Pool(processes=8)
split_dfs = np.array_split(df,8)
pool_results = p.map(pworker, split_dfs)
p.close()
p.join()
tmp_df = pd.concat(pool_results, axis=0)
result_time.append((time.time() - time_start))
# 7. ANN
model = load_model('./model.h5')
a = np.array([df['STRIKE'].values/df['F225_PRICE'].values])
b = np.array([np.ones_like(df['F225_PRICE'].values)*maturity/(40./245.)])
c = np.array([(df['OP_PRICE'].values/df['F225_PRICE'].values/0.25)**0.25])
X = np.vstack((a,b,c)).transpose()
time_start = time.time()
for i in range(loop_num):
tmp_df['vol7'] = 0.4*model.predict(X)+0.1
result_time.append((time.time() - time_start))
# 8. Swig C++
tmpmpmp=np.ones_like(df['F225_PRICE'].values).astype(np.float32)
time_start = time.time()
for i in range(loop_num):
tmpmpmp = Swig_Apply_PY(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values.astype(dtype=np.int32),tmpmpmp.shape[0])
tmp_df['vol8'] = tmpmpmp
result_time.append((time.time() - time_start))
result_time = np.array([result_time])
print(np.vstack((result_con, result_opt, result_Vect, result_Proc,result_time)).transpose())
Recommended Posts