En regardant les données CSV de l'historique des transactions de SBI Securities, L'achat et la vente d'actions sont alignés individuellement, et le profit et la perte sont un peu difficiles à comprendre.
J'ai donc écrit le code parce que je voulais que ce soit un ensemble.
Les données CSV sont les suivantes Ceci n'est pas pratique lorsque vous voulez voir le profit et la perte de chaque action individuellement.
Lorsque vous exécutez le programme,
Ça ressemble à ça.
Le répertoire ressemble à ceci
Après avoir enregistré les données CSV dans un dossier appelé CSV Lorsque vous démarrez sbiAggregater.py, il génère des données Excel récapitulatives. En passant, si vous avez l'intégration de fichiers CSV et des données Excel déjà résumées. Il créera un ancien dossier et le jettera dedans.
Il existe deux programmes, mais le code ne doit être démarré que par sbiAggregater.py.
Le code est ci-dessous
csv_uniter_sbi.py
#! Python3
# -*- coding: utf-8 -*-
# csv_uniter_sbi.py -Intégrer le CSV de l'historique des transactions des titres SBI
import csv, os, sys, datetime, shutil
import logging
#logging.disable(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.debug("Recherche de fichier csv...")
f_list = []
for filename in os.listdir(".\\csv"):
if filename.lower().endswith(".csv"):
f_list.append(filename)
def cPath(filename):
dirname = os.path.join(".\\csv", filename)
return dirname
# {Date:Nom de fichier}Poussez les données dans la liste pour
csv_data = []
if len(f_list) <= 1:
logging.debug("Il y avait moins d'un fichier csv. N'intègre pas les fichiers csv")
else:
for f in f_list:
f = open(cPath(f), "r")
f_reader = csv.reader(f)
f_data = list(f_reader)
#Faites attention à la raison pour laquelle les colonnes à importer proviennent du milieu
for i in range(9, len(f_data)):
if f_data[i] in csv_data:
continue
elif f_data[i] == []:
continue
csv_data.append(f_data[i])
# csv_Faire de toutes les données un objet datetime triable
for i in range(len(csv_data)):
date = datetime.datetime.strptime(csv_data[i][0], "%Y/%m/%d")
csv_data[i][0] = date
#Trier par heure, descendre en sens inverse pour que la nouvelle date soit en haut
csv_data.sort(key=lambda x: x[0], reverse=True)
#Revenir au format d'origine
for i in range(len(csv_data)):
csv_data[i][0] = csv_data[i][0].strftime("%Y/%m/%d")
#Coller les données
new_f = open(cPath("sbi_united_{}.csv".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))), "w", newline="")
new_f_writer = csv.writer(new_f)
new_f_writer.writerow(f_data[8])
new_f_writer.writerows(csv_data)
f.close()
new_f.close()
#Enfin, placez le fichier de référence dans le dossier précédent
for file in f_list:
shutil.move(cPath(file), ".\\csv\\past\\{}".format(file))
logging.debug("traitement csv terminé")
Le principal est ci-dessous
sbiAggregater.py
#! Python3
# -*- coding: utf-8 -*-
# matuiAggregater.py -Un programme qui lit le csv de l'historique des transactions de Matsui Securities et l'agrège
import csv, os, shutil
from openpyxl.utils import get_column_letter, column_index_from_string
from openpyxl.styles import Font
import logging, openpyxl, datetime
logging.disable(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
#Lancer le programme d'intégration csv
import csv_uniter_sbi
logging.debug("Collez l'ancien fichier de résumé_Allez dans le dossier xlsx et générez un nouveau fichier")
#Anciennes données xlsx.\\past_Mettre en xlsx
os.makedirs("past_xlsx", exist_ok=True)
for xlsxfile in os.listdir("."):
if xlsxfile.lower().endswith(".xlsx"):
logging.debug("Collez l'ancien fichier xlsx dans le même dossier_Aller à xlsx")
try:
#Écraser et enregistrer
shutil.move(xlsxfile, ".\\past_xlsx\\{}".format(xlsxfile))
except Exception as exc:
print("{}Est{}Impossible de bouger car".format(xlsxfile, exc))
continue
for filename in os.listdir(".\\csv"):
if filename.lower().endswith(".csv"):
csvfilename = filename
def cPath(file):
return os.path.join(".\\csv", file)
savename = "sbi_matome_{}.xlsx".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
f = open(cPath(csvfilename))
f_reader = csv.reader(f)
f_data = list(f_reader)
#Deux perspectives, setter et checker. Le passeur est subjectif. Le vérificateur est la perspective de trouver une paire. Vérifié s'il correspond au vérificateur_row_Mettez-le dans la liste et le passeur saute cette ligne
checked_row_list = []
only_list = []
#Coller dans un fichier xlsx
wb = openpyxl.Workbook()
sheet = wb.active
#Le type de données est{Catégorie de transaction 0,Marque 1,Code 2,Date d'achat 3,Prix d'achat 4,Date de vente 5,Prix de vente 6,Jours de détention 7,Rapport bénéfice / perte 8,Bénéfices et pertes 9}
data_type_order = ["Classification des transactions", "Marque", "code", "Date d'achat", "Date de vente", "Nombre de jours détenus","Acheter la quantité", "Quantité de vente", "Prix d'achat", "Prix de vente", "Ratio profit / perte","Profit et perte", "commentaire"]
#Depuis que j'ai créé une nouvelle feuille Excel, les données_type_Remplissez la première colonne selon la liste de commande
for Col in range(1, len(data_type_order)+ 1):
sheet.cell(1, Col).value = data_type_order[Col - 1]
#La première ligne est fixée à la fenêtre
sheet.freeze_panes = "A2"
#Ajuster la largeur
sheet.column_dimensions["A"].width = 11.44
sheet.column_dimensions["B"].width = 28.22
sheet.column_dimensions["C"].width = 5.8
sheet.column_dimensions["D"].width = 12
sheet.column_dimensions["E"].width = 12
sheet.column_dimensions["F"].width = 8.67
sheet.column_dimensions["G"].width = 7.11
sheet.column_dimensions["H"].width = 7.11
sheet.column_dimensions["I"].width = 8.67
sheet.column_dimensions["J"].width = 8.67
sheet.column_dimensions["K"].width = 7.56
sheet.column_dimensions["L"].width = 8.67
sheet.column_dimensions["M"].width = 19.33
#En tant que bloc de colonnes organisé par date de contrat, recherchez-le dans l'ordre à partir de la date la plus ancienne.{Date de livraison: row}
day_row_dict = {}
for row in range(9, len(f_data)):
#Ignorer s'il y a des données gratuites
if len(f_data[row]) == 0:
continue
day_row_dict.setdefault(datetime.datetime.strptime(f_data[row][0], "%Y/%m/%d"), [])
day_row_dict[datetime.datetime.strptime(f_data[row][0], "%Y/%m/%d")].append(row)
#Trier les dates au cas où.Le vieux jour vient d'abord à l'envers=False
day_row_key = sorted(day_row_dict, reverse=False)
def pasteExcel(data):
"""Recevez les données du dictionnaire et collez-les sur la première ligne d'Excel"""
#Coller sur une feuille Excel
sheet.insert_rows(2)
#Reportez-vous à la première colonne d'Excel et collez
for Col in range(1, sheet.max_column + 1):
try:
sheet.cell(2, Col).value = data[sheet.cell(1, Col).value]
#Ignorer si une clé qui n'est pas dans les données apparaît
except KeyError:
continue
def pareset(ROW1, ROW2):
"""ROW1 est acheté en réorganisant les données et ROW2 est vendu. Appelez ceci pour vendre seulement et acheter seulement"""
data = {}
data["commentaire"] = ""
if ROW1 == [] and ROW2:
#Quand seulement en vente
data["Classification des transactions"] = f_data[ROW2[0]][4]
data["code"] = int(f_data[ROW2[0]][2])
data["Marque"] = f_data[ROW2[0]][1]
data["Date de vente"] = f_data[ROW2[-1]][0]
sell_sum = 0
sell_num = 0
#Montant de la livraison, nombre d'actions
for i in range(len(ROW2)):
#Si vous achetez ou vendez un nouvel article, les frais de livraison seront"--"Alors
if f_data[ROW2[i]][4][2:4] == "Nouveau":
sell_sum += 0
#En remboursement+-Parce que ça sort normalement.
elif f_data[ROW2[i]][4][2:4] == "remboursement":
sell_sum += int(f_data[ROW2[i]][13])
else:
sell_sum += int(f_data[ROW2[i]][13])
sell_num += int(f_data[ROW2[i]][8])
data["Prix de vente"] = sell_sum
data["Quantité de vente"] = sell_num
data["Profit et perte"] = sell_sum
data["commentaire"] += "Pas assez de données d'achat"
elif ROW1 and ROW2 == []:
#Lors de l'achat uniquement
data["Classification des transactions"] = f_data[ROW1[0]][4]
data["code"] = int(f_data[ROW1[0]][2])
data["Marque"] = f_data[ROW1[0]][1]
data["Date d'achat"] = f_data[ROW1[0]][0]
buy_sum = 0
buy_num = 0
for i in range(len(ROW1)):
if f_data[ROW1[i]][4][2:4] == "Nouveau":
buy_sum += 0
#En remboursement+-Parce que ça sort normalement.
elif f_data[ROW1[i]][4][2:4] == "remboursement":
buy_sum += int(f_data[ROW1[i]][13])
#Dans le cas de l'article réel, rendez l'achat négatif
else:
buy_sum -= int(f_data[ROW1[i]][13])
buy_num += int(f_data[ROW1[i]][8])
data["Prix d'achat"] = buy_sum
data["Acheter la quantité"] = buy_num
data["Profit et perte"] = buy_sum
#Quand il s'agit d'une paire achat / vente
elif ROW1 and ROW2:
data["Classification des transactions"] = f_data[ROW2[0]][4]
data["code"] = int(f_data[ROW2[0]][2])
data["Marque"] = f_data[ROW2[0]][1]
#En cas de multiple, la dernière date de vente est la vente
data["Date de vente"] = f_data[ROW2[-1]][0]
#Prix de vente,Le prix d'achat et le nombre d'actions sont la somme des montants de livraison dans la liste.
sell_sum = 0
sell_num = 0
buy_sum = 0
buy_num = 0
for i in range(len(ROW2)):
#Si vous achetez ou vendez un nouvel article, les frais de livraison seront"--"Alors
if f_data[ROW2[i]][4][2:4] == "Nouveau":
sell_sum += 0
#En remboursement+-Parce que ça sort normalement.
elif f_data[ROW2[i]][4][2:4] == "remboursement":
sell_sum += int(f_data[ROW2[i]][13])
else:
sell_sum += int(f_data[ROW2[i]][13])
sell_num += int(f_data[ROW2[i]][8])
data["Prix de vente"] = sell_sum
data["Quantité de vente"] = sell_num
data["Date d'achat"] = f_data[ROW1[0]][0]
for i in range(len(ROW1)):
if f_data[ROW1[i]][4][2:4] == "Nouveau":
buy_sum += 0
#En remboursement+-Parce que ça sort normalement.
elif f_data[ROW1[i]][4][2:4] == "remboursement":
buy_sum += int(f_data[ROW1[i]][13])
#Dans le cas de l'article réel, rendez l'achat négatif
else:
buy_sum += -int(f_data[ROW1[i]][13])
buy_num += int(f_data[ROW1[i]][8])
data["Prix d'achat"] = buy_sum
data["Acheter la quantité"] = buy_num
#Le nombre de jours détenus est calculé en utilisant datetime.La soustraction de datetime crée un objet timedelta
date1 = datetime.datetime.strptime(f_data[ROW1[0]][0], "%Y/%m/%d")
date2 = datetime.datetime.strptime(f_data[ROW2[-1]][0], "%Y/%m/%d")
data["Nombre de jours détenus"] = int((date2 - date1).days)
# +-A déjà été ajouté, alors soyez prudent
data["Profit et perte"] = int(data["Prix d'achat"]) + int(data["Prix de vente"])
#Le ratio profit / perte est le montant du profit / perte/Prix d'achat (valeur absolue)) *100 unités%
if data["Prix d'achat"] == 0:
data["Ratio profit / perte"] = round(int(data["Profit et perte"]) / abs(int(data["Prix de vente"])) * 100, 2)
else:
data["Ratio profit / perte"] = round(int(data["Profit et perte"]) / abs(int(data["Prix d'achat"])) * 100, 2)
if len(ROW2) > 1:
data["commentaire"] += "Vendu séparément. "
if len(ROW1) > 1:
data["commentaire"] += "Achat supplémentaire. "
if sell_num > buy_num:
data["commentaire"] += "Pas assez de données d'achat"
else:
raise Exception("pareset()Il y a une exception dans")
pasteExcel(data)
for date in day_row_key:
for Row in day_row_dict[date]:
#Ignorer les données gratuites
if len(f_data[Row]) == 0:
continue
#Ligne de point de vue principal:Scannez un par un par date
#Ignorer si le vérificateur de recherche de paires décrit ci-dessous est coché
if Row in checked_row_list:
continue
checked_row_list.append(Row)
#Puisqu'il est en nature et acheté, le vérificateur recherche une paire à vendre. Définir la quantité de vente du vérificateur
#Continuez à rechercher le même stock que vous avez recherché dans le vérificateur. Jusqu'à ce qu'il rattrape le premier nombre d'actions. De plus, si vous achetez avant que le nombre d'actions que vous avez achetées ne devienne 0, il sera ajouté au nombre d'actions restantes et vous rechercherez d'autres ventes.
#S'il reste un stock jusqu'à la fin, vous devez l'insérer sur Excel à la toute fin
#Initialiser l'achat et la vente
multiple_checker_sell_rows = []
multiple_checker_buy_rows = []
if f_data[Row][4].endswith("Acheter"):
multiple_checker_buy_rows.append(Row)
#Réglage initial du nombre d'actions restantes
num_stocks_remaining = int(f_data[Row][8])
elif f_data[Row][4].endswith("Vendre"):
multiple_checker_sell_rows.append(Row)
#Réglage initial du nombre d'actions restantes
num_stocks_remaining = -int(f_data[Row][8])
else:
raise Exception
#vérificateur: checker_row cherche une paire. Vérifier la paire
for checker_date in day_row_key[day_row_key.index(date):]:
for checker_row in day_row_dict[checker_date]:
logging.debug("Row: {}, f_data[Row][2]: {}, f_data[checker_row][2]: {}, f_data[checker_row][4]: {}".format(Row, f_data[Row][2], f_data[checker_row][2], f_data[checker_row][4]))
#Ignorer les données gratuites
if len(f_data[checker_row]) == 0:
continue
#Ignorer les paires ou les colonnes reflétées xl
elif checker_row in checked_row_list:
continue
#Jugez s'il s'agit d'un crédit ou en nature, sautez si non
#Jugez s'il s'agit de crédit ou de stock physique. Recherchez une paire physique pour le physique et une paire de crédit pour le crédit."Ventes d'actions en nature(Acheter)"Ou"Crédit nouvelle vente"Vous pouvez obtenir la valeur de
elif f_data[checker_row][4][:2] != f_data[Row][4][:2]:
continue
#Trouvez une paire qui répond à vos critères
elif (f_data[Row][2] == f_data[checker_row][2]) and f_data[checker_row][4].endswith("Vendre"):
#Les correspondances sont la ligne principale et le vérificateur de vérificateur_Autoriser la ligne à sauter
checked_row_list.append(checker_row)
multiple_checker_sell_rows.append(checker_row)
num_stocks_remaining -= int(f_data[checker_row][8])
if num_stocks_remaining == 0:
#Considérez-le comme une paire et transmettez les données de la paire aux données Excel pour les vérifier
pareset(ROW1=multiple_checker_buy_rows, ROW2=multiple_checker_sell_rows)
logging.debug("Paire trouvée! ROW1: {}, ROW2: {}".format(multiple_checker_buy_rows, multiple_checker_sell_rows))
break
else:
continue
elif (f_data[Row][2] == f_data[checker_row][2]) and f_data[checker_row][4].endswith("Acheter"):
checked_row_list.append(checker_row)
multiple_checker_buy_rows.append(checker_row)
#S'il y a un autre achat avant que le nombre d'actions restantes n'atteigne 0, augmentez le nombre d'actions restantes.
num_stocks_remaining += int(f_data[checker_row][8])
if num_stocks_remaining == 0:
#Formation de paires
pareset(ROW1=multiple_checker_buy_rows, ROW2=multiple_checker_sell_rows)
logging.debug("Paire trouvée! ROW1: {}, ROW2: {}".format(multiple_checker_buy_rows, multiple_checker_sell_rows))
break
else:
continue
else:
logging.debug("Ce n'était pas une paire. (checker_row: {})".format(checker_row))
continue
else:
#Si vous terminez en une seule date sans interruption, passez à la date suivante
continue
#Lorsque le vérificateur se brise et se termine, sortez de la boucle du vérificateur et recherchez la prochaine ligne de point de vue principal
break
else:
#Si le vérificateur a fini de regarder toutes les colonnes, mais qu'il y a toujours des partages
if num_stocks_remaining != 0:
#Amenez-le au sommet à la toute fin,Je souhaite afficher le nombre d'actions restantes
only_list.append((multiple_checker_buy_rows, multiple_checker_sell_rows))
#Apportez ce que vous n'avez pas encore vendu
for row1, row2 in only_list:
logging.debug("Certains n'ont pas de paire. ROW1: {}, ROW2: {}".format(row1, row2))
pareset(ROW1=row1, ROW2=row2)
#Si le ratio profit / perte est négatif, inscrivez-le dans le rouge
font_st_red = Font(color="FF0000")
for row in range(2, sheet.max_row + 1):
if sheet["K{}".format(row)].value == None or sheet["K{}".format(row)].value == []:
continue
elif sheet["K{}".format(row)].value < 0:
sheet["K{}".format(row)].font = font_st_red
wb.save(savename)
f.close()
print("traitement xlsx terminé")
C'est une force brute, mais veuillez l'utiliser si vous le souhaitez.