Looking at the CSV data of SBI SECURITIES'transaction history, The buying and selling of stocks are lined up individually, and the profit and loss is a little difficult to understand.
So I wrote the code because I wanted it to be a set.
The CSV data is as follows This is inconvenient when you want to see the profit and loss of each stock individually.
When you run the program,
It looks like this.
The directory looks like this
After saving the CSV data in a folder called CSV, When you start sbiAggregater.py, it will generate a summary Excel data. By the way, if you have CSV file integration and already summarized Excel data. It will create a past folder and throw it into it.
There are two programs, but the code only needs to be started by sbiAggregater.py.
The code is below
csv_uniter_sbi.py
#! Python3
# -*- coding: utf-8 -*-
# csv_uniter_sbi.py -Integrate csv of transaction history of sbi SECURITIES
import csv, os, sys, datetime, shutil
import logging
#logging.disable(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.debug("Searching for csv files...")
f_list = []
for filename in os.listdir(".\\csv"):
if filename.lower().endswith(".csv"):
f_list.append(filename)
def cPath(filename):
dirname = os.path.join(".\\csv", filename)
return dirname
# {date:File name}Push the data into the list for
csv_data = []
if len(f_list) <= 1:
logging.debug("There was less than one csv file. Does not integrate csv files")
else:
for f in f_list:
f = open(cPath(f), "r")
f_reader = csv.reader(f)
f_data = list(f_reader)
#Be careful why the columns to be imported are from the middle
for i in range(9, len(f_data)):
if f_data[i] in csv_data:
continue
elif f_data[i] == []:
continue
csv_data.append(f_data[i])
# csv_Make all data a sortable datetime object
for i in range(len(csv_data)):
date = datetime.datetime.strptime(csv_data[i][0], "%Y/%m/%d")
csv_data[i][0] = date
#Sort by time, descend by reverse so that the new date is on top
csv_data.sort(key=lambda x: x[0], reverse=True)
#Revert to original format
for i in range(len(csv_data)):
csv_data[i][0] = csv_data[i][0].strftime("%Y/%m/%d")
#Paste the data
new_f = open(cPath("sbi_united_{}.csv".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))), "w", newline="")
new_f_writer = csv.writer(new_f)
new_f_writer.writerow(f_data[8])
new_f_writer.writerows(csv_data)
f.close()
new_f.close()
#Finally put the reference file in the past folder
for file in f_list:
shutil.move(cPath(file), ".\\csv\\past\\{}".format(file))
logging.debug("csv processing completed")
The main is below
sbiAggregater.py
#! Python3
# -*- coding: utf-8 -*-
# matuiAggregater.py -A program that reads csv of Matsui Securities' transaction history and aggregates it
import csv, os, shutil
from openpyxl.utils import get_column_letter, column_index_from_string
from openpyxl.styles import Font
import logging, openpyxl, datetime
logging.disable(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
#Launch the csv integration program
import csv_uniter_sbi
logging.debug("Paste the old summary file_Go to the xlsx folder and generate a new file")
#Old xlsx data.\\past_Put in xlsx
os.makedirs("past_xlsx", exist_ok=True)
for xlsxfile in os.listdir("."):
if xlsxfile.lower().endswith(".xlsx"):
logging.debug("Paste the old xlsx file in the same folder_Go to xlsx")
try:
#Overwrite and save
shutil.move(xlsxfile, ".\\past_xlsx\\{}".format(xlsxfile))
except Exception as exc:
print("{}Is{}Could not move because".format(xlsxfile, exc))
continue
for filename in os.listdir(".\\csv"):
if filename.lower().endswith(".csv"):
csvfilename = filename
def cPath(file):
return os.path.join(".\\csv", file)
savename = "sbi_matome_{}.xlsx".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
f = open(cPath(csvfilename))
f_reader = csv.reader(f)
f_data = list(f_reader)
#Two perspectives, setter and checker. The setter is subjective. The checker is the perspective of finding a pair. Checked if it matches the checker_row_Put it in list and the setter skips that line
checked_row_list = []
only_list = []
#Paste to xlsx file
wb = openpyxl.Workbook()
sheet = wb.active
#The data type is{Transaction category 0,Brand 1,Code 2,Purchase date 3,Bid price 4,Sale date 5,Selling price 6,Holding days 7,P & L 8,Profit and loss 9}
data_type_order = ["Transaction classification", "Brand", "code", "Purchase date", "Sale date", "Number of days held","Buy quantity", "Selling quantity", "Bid price", "Selling price", "Profit and loss ratio","Profit and loss", "comment"]
#Since I created a new excel sheet, data_type_Fill the first column according to the order list
for Col in range(1, len(data_type_order)+ 1):
sheet.cell(1, Col).value = data_type_order[Col - 1]
#The first row is fixed to the window
sheet.freeze_panes = "A2"
#Adjust width
sheet.column_dimensions["A"].width = 11.44
sheet.column_dimensions["B"].width = 28.22
sheet.column_dimensions["C"].width = 5.8
sheet.column_dimensions["D"].width = 12
sheet.column_dimensions["E"].width = 12
sheet.column_dimensions["F"].width = 8.67
sheet.column_dimensions["G"].width = 7.11
sheet.column_dimensions["H"].width = 7.11
sheet.column_dimensions["I"].width = 8.67
sheet.column_dimensions["J"].width = 8.67
sheet.column_dimensions["K"].width = 7.56
sheet.column_dimensions["L"].width = 8.67
sheet.column_dimensions["M"].width = 19.33
#As a block of columns organized by contract date, it is searched in order from the oldest date.{Delivery date: row}
day_row_dict = {}
for row in range(9, len(f_data)):
#Skip if there is free data
if len(f_data[row]) == 0:
continue
day_row_dict.setdefault(datetime.datetime.strptime(f_data[row][0], "%Y/%m/%d"), [])
day_row_dict[datetime.datetime.strptime(f_data[row][0], "%Y/%m/%d")].append(row)
#Sort dates just in case.The old day comes first reverse=False
day_row_key = sorted(day_row_dict, reverse=False)
def pasteExcel(data):
"""Receive dictionary data and paste it in the first line of excel"""
#Paste on excel sheet
sheet.insert_rows(2)
#Refer to the first column of Excel and paste
for Col in range(1, sheet.max_column + 1):
try:
sheet.cell(2, Col).value = data[sheet.cell(1, Col).value]
#Skip if a key that is not in data appears
except KeyError:
continue
def pareset(ROW1, ROW2):
"""ROW1 is bought by rearranging the data, and ROW2 is sold. Call this for both sell only and buy only"""
data = {}
data["comment"] = ""
if ROW1 == [] and ROW2:
#When only for sale
data["Transaction classification"] = f_data[ROW2[0]][4]
data["code"] = int(f_data[ROW2[0]][2])
data["Brand"] = f_data[ROW2[0]][1]
data["Sale date"] = f_data[ROW2[-1]][0]
sell_sum = 0
sell_num = 0
#Delivery amount, number of shares
for i in range(len(ROW2)):
#If you buy or sell a new item, the delivery money will be"--"So
if f_data[ROW2[i]][4][2:4] == "New":
sell_sum += 0
#In repayment+-Because it comes out normally.
elif f_data[ROW2[i]][4][2:4] == "repayment":
sell_sum += int(f_data[ROW2[i]][13])
else:
sell_sum += int(f_data[ROW2[i]][13])
sell_num += int(f_data[ROW2[i]][8])
data["Selling price"] = sell_sum
data["Selling quantity"] = sell_num
data["Profit and loss"] = sell_sum
data["comment"] += "Not enough buying data"
elif ROW1 and ROW2 == []:
#When only buying
data["Transaction classification"] = f_data[ROW1[0]][4]
data["code"] = int(f_data[ROW1[0]][2])
data["Brand"] = f_data[ROW1[0]][1]
data["Purchase date"] = f_data[ROW1[0]][0]
buy_sum = 0
buy_num = 0
for i in range(len(ROW1)):
if f_data[ROW1[i]][4][2:4] == "New":
buy_sum += 0
#In repayment+-Because it comes out normally.
elif f_data[ROW1[i]][4][2:4] == "repayment":
buy_sum += int(f_data[ROW1[i]][13])
#In the case of the actual item, make the purchase negative
else:
buy_sum -= int(f_data[ROW1[i]][13])
buy_num += int(f_data[ROW1[i]][8])
data["Bid price"] = buy_sum
data["Buy quantity"] = buy_num
data["Profit and loss"] = buy_sum
#When it is a buy / sell pair
elif ROW1 and ROW2:
data["Transaction classification"] = f_data[ROW2[0]][4]
data["code"] = int(f_data[ROW2[0]][2])
data["Brand"] = f_data[ROW2[0]][1]
#In case of multiple, the last sale date is the sale
data["Sale date"] = f_data[ROW2[-1]][0]
#Selling price,Bid price and number of shares add up the delivery amount of the list
sell_sum = 0
sell_num = 0
buy_sum = 0
buy_num = 0
for i in range(len(ROW2)):
#If you buy or sell a new item, the delivery money will be"--"So
if f_data[ROW2[i]][4][2:4] == "New":
sell_sum += 0
#In repayment+-Because it comes out normally.
elif f_data[ROW2[i]][4][2:4] == "repayment":
sell_sum += int(f_data[ROW2[i]][13])
else:
sell_sum += int(f_data[ROW2[i]][13])
sell_num += int(f_data[ROW2[i]][8])
data["Selling price"] = sell_sum
data["Selling quantity"] = sell_num
data["Purchase date"] = f_data[ROW1[0]][0]
for i in range(len(ROW1)):
if f_data[ROW1[i]][4][2:4] == "New":
buy_sum += 0
#In repayment+-Because it comes out normally.
elif f_data[ROW1[i]][4][2:4] == "repayment":
buy_sum += int(f_data[ROW1[i]][13])
#In the case of the actual item, make the purchase negative
else:
buy_sum += -int(f_data[ROW1[i]][13])
buy_num += int(f_data[ROW1[i]][8])
data["Bid price"] = buy_sum
data["Buy quantity"] = buy_num
#The number of days held is calculated using datetime.Subtracting datetime creates a timedelta object
date1 = datetime.datetime.strptime(f_data[ROW1[0]][0], "%Y/%m/%d")
date2 = datetime.datetime.strptime(f_data[ROW2[-1]][0], "%Y/%m/%d")
data["Number of days held"] = int((date2 - date1).days)
# +-Has already been added, so be careful
data["Profit and loss"] = int(data["Bid price"]) + int(data["Selling price"])
#The profit / loss ratio is the profit / loss amount/Bid price (absolute value)) *100 units%
if data["Bid price"] == 0:
data["Profit and loss ratio"] = round(int(data["Profit and loss"]) / abs(int(data["Selling price"])) * 100, 2)
else:
data["Profit and loss ratio"] = round(int(data["Profit and loss"]) / abs(int(data["Bid price"])) * 100, 2)
if len(ROW2) > 1:
data["comment"] += "Sold separately. "
if len(ROW1) > 1:
data["comment"] += "Additional purchase. "
if sell_num > buy_num:
data["comment"] += "Not enough buying data"
else:
raise Exception("pareset()There is an exception in")
pasteExcel(data)
for date in day_row_key:
for Row in day_row_dict[date]:
#Skip free data
if len(f_data[Row]) == 0:
continue
#Main viewpoint Row:Scan one by one by date
#Skip if the pair search checker described below is checked
if Row in checked_row_list:
continue
checked_row_list.append(Row)
#Since it is in-kind and bought, the checker looks for a pair for sale. Set the quantity of checkers to sell
#Continue to search for the same stock that you searched for in the checker. Until it catches up with the first number of shares. Also, if you buy before the number of shares you bought becomes 0, it will be added to the number of remaining shares and you will look for further sales.
#If there is a remaining stock until the end, you have to insert it on Excel at the very end
#Initialize buy and sell
multiple_checker_sell_rows = []
multiple_checker_buy_rows = []
if f_data[Row][4].endswith("Buy"):
multiple_checker_buy_rows.append(Row)
#Initial setting of the number of remaining shares
num_stocks_remaining = int(f_data[Row][8])
elif f_data[Row][4].endswith("Sell"):
multiple_checker_sell_rows.append(Row)
#Initial setting of the number of remaining shares
num_stocks_remaining = -int(f_data[Row][8])
else:
raise Exception
#checker: checker_row looks for a pair. Make the pair checked
for checker_date in day_row_key[day_row_key.index(date):]:
for checker_row in day_row_dict[checker_date]:
logging.debug("Row: {}, f_data[Row][2]: {}, f_data[checker_row][2]: {}, f_data[checker_row][4]: {}".format(Row, f_data[Row][2], f_data[checker_row][2], f_data[checker_row][4]))
#Skip free data
if len(f_data[checker_row]) == 0:
continue
#Skip pairs or xl reflected columns
elif checker_row in checked_row_list:
continue
#Judge whether it is credit or in-kind, skip if not
#Judge whether it is credit or in-kind stock. Look for a physical pair for physical and a credit pair for credit."Stock sale in kind(Buy)"Or"Credit new sale"You can get the value of
elif f_data[checker_row][4][:2] != f_data[Row][4][:2]:
continue
#Find a pair that meets your criteria
elif (f_data[Row][2] == f_data[checker_row][2]) and f_data[checker_row][4].endswith("Sell"):
#Matches are the main Row and checker checkers_Allow row to skip
checked_row_list.append(checker_row)
multiple_checker_sell_rows.append(checker_row)
num_stocks_remaining -= int(f_data[checker_row][8])
if num_stocks_remaining == 0:
#Consider it as a pair, pass the pair data to Excel data, and mark it as checked
pareset(ROW1=multiple_checker_buy_rows, ROW2=multiple_checker_sell_rows)
logging.debug("Pair found! ROW1: {}, ROW2: {}".format(multiple_checker_buy_rows, multiple_checker_sell_rows))
break
else:
continue
elif (f_data[Row][2] == f_data[checker_row][2]) and f_data[checker_row][4].endswith("Buy"):
checked_row_list.append(checker_row)
multiple_checker_buy_rows.append(checker_row)
#If there is another purchase before the number of remaining shares reaches 0, increase the number of remaining shares.
num_stocks_remaining += int(f_data[checker_row][8])
if num_stocks_remaining == 0:
#Pair formation
pareset(ROW1=multiple_checker_buy_rows, ROW2=multiple_checker_sell_rows)
logging.debug("Pair found! ROW1: {}, ROW2: {}".format(multiple_checker_buy_rows, multiple_checker_sell_rows))
break
else:
continue
else:
logging.debug("This was not a pair. (checker_row: {})".format(checker_row))
continue
else:
#If you finish within one date without a break, move on to the next date
continue
#When the checker breaks and finishes, break out of the checker's loop and look for the next main view Row
break
else:
#If the checker has finished looking at all the columns, but there are still shares
if num_stocks_remaining != 0:
#Bring it to the top at the very end,I want to display the number of remaining shares
only_list.append((multiple_checker_buy_rows, multiple_checker_sell_rows))
#Bring up what you haven't sold yet
for row1, row2 in only_list:
logging.debug("Some have no pair. ROW1: {}, ROW2: {}".format(row1, row2))
pareset(ROW1=row1, ROW2=row2)
#If the profit / loss ratio is negative, make it in the red
font_st_red = Font(color="FF0000")
for row in range(2, sheet.max_row + 1):
if sheet["K{}".format(row)].value == None or sheet["K{}".format(row)].value == []:
continue
elif sheet["K{}".format(row)].value < 0:
sheet["K{}".format(row)].font = font_st_red
wb.save(savename)
f.close()
print("xlsx processing completed")
It's a brute force, but please use it if you like.
Recommended Posts