KABU+という株式データ配信サービスがあり、加入して日々検証などを行っている。
今までは取得したCSVを読み込んで処理をしていたが、連続で大量のファイルにアクセスするので、いつかはDB化して処理を軽くできないかと考えていた。
この度重い腰を上げて挑戦したところDB化と日々のデータ取得も併せて自動化できたので忘備録として記す。
Contents
1.概要というか背景というか
KABU+というサブスクサービスがある。
月額1500円くらい?で日々株式の各種データが配信されるというもの。
このデータを用いて(まったく勝てていないが)トレードの検証などをしている。
ヤフーなどからスクレイピングで毎日取得していけば金はかからないが、過去データを取るのが面倒なのでKABU+を使用している形。
昔は数年分データがあったが今は1年分しか過去データがないので、今からデータを蓄積していきたいとかじゃないのであれば別サービスのほうがいいかもしれない。
サービス自体は配信元のCSVをエクスプローラのネットワークドライブに登録とかして、参照しに行く形。
検証などで使用するなら別途ローカルにダウンロードしておいたほうが扱いやすいので、基本はダウンロードして運用することになる。
ただダウンロードしただけだとローカルに大量にCSVファイルがある状況のため、いろいろ扱いにくい。
そこで今回は
①初期データとして公式からデータをローカルにダウンロード
②それをDBに登録
③日々の更新データを取得してDBに登録まで自動化
を行う。
2.環境とか構成とかソースコードとか
環境は24h365d動いてほしいので年初に導入したNAS。の上に建てたVM(Linux)で諸々処理する。
なお書いてる人はプログラム何それおいしいのレベルのため、よくわからないけど動くからおっけーの精神で書いてるとだけ記載しておく。
構成は下記。
--
|--database_module
| |
| |--kabu_plus_download.py
| |--kabu_plus_read.py
| |--operationTool.py
|
|
|--exe_program
| |
| |--download_StockData_Daily.py
| |--first_insert_DB.py
ファイルが無駄に多いのは無計画にモジュールを増やしていったからであり、一つにまとめられるならそうしたほうがいい。
各プログラムは下記のような感じ。
またlocalDataなるものをインポートしているが、これは当環境のフォルダパス等をまとめたもの。
なのでプラグラム中のA.XXXXXも適宜解読して自身の環境に合わせてもらえればと思う。
またデータの自動取得及びDB登録に無関係の関数も多々あるが、整理するのが面倒だったので残してあるだけ。
sys.path.append(os.path.join(os.path.dirname(__file__), '../../localData'))
import localData as A
kabu_plus_download.py
#https://nerimplo.hatenablog.com/entry/2018/07/06/000000 を参考
import requests
from requests.auth import HTTPBasicAuth
import csv
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../localData')) #ここは削除でおk
import localData as A #ここも削除でおk
def retrieve_csv_file(url):
id = 'KABU+のID'
pw = 'KABU+のPW'
res = requests.get(url, auth=HTTPBasicAuth(id, pw))
data = res.content.decode('shift-jis')
return data
def to_csv(data, file=None):
if file:
with open(file, 'w', encoding='shift-jis') as f:
writer = csv.writer(f, delimiter=',', quotechar='"',quoting=csv.QUOTE_NONNUMERIC)
reader = csv.reader(data.splitlines())
for row in reader:
writer.writerow(row)
def downloadNewPrices2():
URL = 'https://csvex.com/kabu.plus/csv/japan-all-stock-prices-2/daily/japan-all-stock-prices-2.csv'
data = retrieve_csv_file(URL)
filePath = A.stockPrices2DailyPath + "/japan-all-stock-prices-2.csv" #ファイルパスは各環境に修正が必要
to_csv(data, filePath)
def downloadNewStockData():
URL = 'https://csvex.com/kabu.plus/csv/japan-all-stock-data/daily/japan-all-stock-data.csv'
data = retrieve_csv_file(URL)
filePath = A.stockDataDailyPath + "/japan-all-stock-data.csv" #ファイルパスは各環境に修正が必要
to_csv(data, filePath)
このファイルはその名の通り、japan-all-stock-prices-2.csvとjapan-all-stock-data.csvをダウンロードするモジュール。
KABU+のIDとPW、ファイルパスは各自のものを入れればいい。
kabu_plus_read.py
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../localData'))
import localData as A
import pandas as pd
import numpy as np
#YYYYMMDDは文字列。関数を呼び出す前に呼び出し元で文字列化する
def getPrices2(YYYYMMDD):
fileName = "japan-all-stock-prices-2_" + str(YYYYMMDD) + ".csv"
filePath = A.stockPrices2DailyPath + "/" + fileName
#pricesデータを取得
readData = pd.read_csv(filePath,encoding="SHIFT-JIS")
#仕様変更に伴い日時列がある場合、日付列へ変更
columnsList = readData.columns.tolist()
dateStr = "日時"
if str(dateStr) in str(columnsList):
readData = readData.rename(columns={'日時':'日付'})
#表記を統一するために日付列はすべて引数のYYYYMMDDに置換
readData["日付"] = str(YYYYMMDD)
return readData
#当日のデータを取得
def getNewPrices2():
fileName = "japan-all-stock-prices-2.csv"
filePath = A.stockPrices2DailyPath + "/" + fileName
#pricesデータを取得
readData = pd.read_csv(filePath,encoding="SHIFT-JIS")
#仕様変更に伴い日時列がある場合、日付列へ変更
columnsList = readData.columns.tolist()
dateStr = "日時"
if str(dateStr) in str(columnsList):
readData = readData.rename(columns={'日時':'日付'})
return readData
#YYYYMMDDは文字列。関数を呼び出す前に呼び出し元で文字列化する
def getStockData(YYYYMMDD):
fileName = "japan-all-stock-data_" + str(YYYYMMDD) + ".csv"
filePath = A.stockDataDailyPath + "/" + fileName
#pricesデータを取得
readData = pd.read_csv(filePath,encoding="SHIFT-JIS")
#仕様変更に伴い配当利回り列がある場合、配当利回り(予想)列へ変更
columnsList = readData.columns.tolist()
dateStr = "配当利回り"
if str(dateStr) in str(columnsList):
readData = readData.rename(columns={'配当利回り':'配当利回り(予想)'})
dateStr = "1株配当"
if str(dateStr) in str(columnsList):
readData = readData.rename(columns={'1株配当':'1株配当(予想)'})
return readData
#当日のデータを取得
def getNewStockData():
fileName = "japan-all-stock-data.csv"
filePath = A.stockDataDailyPath + "/" + fileName
#pricesデータを取得
readData = pd.read_csv(filePath,encoding="SHIFT-JIS")
#仕様変更に伴い配当利回り列がある場合、配当利回り(予想)列へ変更
columnsList = readData.columns.tolist()
dateStr = "配当利回り"
if str(dateStr) in str(columnsList):
readData = readData.rename(columns={'配当利回り':'配当利回り(予想)'})
dateStr = "1株配当"
if str(dateStr) in str(columnsList):
readData = readData.rename(columns={'1株配当':'1株配当(予想)'})
return readData
#prices2とstockDataをマージしてマッピング及びデータ型の整理
def mergePrices2AndStockData(prices2,stockData):
mergeData = pd.merge(prices2,stockData)
mapping = {'SC': 'Code',
'名称': 'Name',
'市場': 'Market',
'業種': 'Industry',
'日付': 'Timestamp',
'株価': 'Price',
'前日比': 'Change',
'前日比(%)': 'ChangeInPercent',
'前日終値': 'PreviousClosePx',
'始値': 'Open',
'高値': 'High',
'安値': 'Low',
'VWAP': 'VWAP',
'出来高': 'Volume',
'出来高率': 'VolumeInPercent',
'売買代金(千円)': 'TradingVolume',
'時価総額(百万円)': 'MarketCap',
'値幅下限': 'LowerRange',
'値幅上限': 'UpperRange',
'高値日付': 'HighDate',
'年初来高値': 'YTDHigh',
'年初来高値乖離率': 'DeviationFromYTDHigh',
'安値日付': 'LowDate',
'年初来安値': 'YTDLow',
'年初来安値乖離率': 'DeviationFromYTDLow',
'発行済株式数': 'StocksIssued',
'配当利回り(予想)': 'DividendYield',
'1株配当(予想)': 'DividendPerShare',
'PER(予想)': 'PER',
'PBR(実績)': 'PBR',
'EPS(予想)': 'EPS',
'BPS(実績)': 'BPS',
'最低購入額': 'MinimumPurchaseAmount',
'単元株': 'UnitShare'}
non_double_precisions = ['Code', 'Name', 'Market', 'Timestamp', 'Industry', 'HighDate', 'LowDate']
#列名を処理しやすいように英語に変更
mergeData = mergeData.rename(columns=mapping)
mergeData = mergeData.replace('-', np.nan)
for column_type in mapping.values():
if column_type not in non_double_precisions:
mergeData[column_type] = list(map(lambda x: float(x), mergeData[column_type]))
return mergeData
このファイルはその名の通り、japan-all-stock-prices-2.csvとjapan-all-stock-data.csvからデータを読み込むモジュール。
同一日付のデータをそれぞれ読み込み合体させて、カラム名を英語にして返す感じ。
operationTool.py
import sqlite3
import pandas as pd
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__),'../database_module'))
import kabu_plus_read as k
#初回のみ実施。実質意味なしモジュール
def createDb():
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return 0
#証券データを追加で投入
def insertStockData(YYYYMMDD):
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
prices2 = k.getPrices2(YYYYMMDD)
stockData = k.getStockData(YYYYMMDD)
mergeData = k.mergePrices2AndStockData(prices2,stockData)
mergeData.to_sql('StockData',conn,if_exists='append',index=None)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return 0
#証券データを追加で投入
def insertStockData(mergeData):
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
mergeData.to_sql('StockData',conn,if_exists='append',index=None)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return 0
#テーブル削除
def deleteTable():
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
cur = conn.cursor()
sql = "drop table StockData;"
cur.execute(sql)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return 0
def readAllTable():
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
readData = pd.read_sql('SELECT * FROM StockData', conn)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return readData
def searchTableStockAndDate(stock,YYYYMMDD):
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
sqlSentence = "SELECT * FROM StockData" + " WHERE Code IS " + str(stock) + " AND Timestamp IS " + str(YYYYMMDD)
readData = pd.read_sql(sqlSentence, conn)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return readData
def searchTableSqlCondition(sqlCondition):
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
sqlSentence = "SELECT Code,Timestamp FROM StockData " + sqlCondition
readData = pd.read_sql(sqlSentence, conn)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return readData
def searchTableStock(stock):
# TEST.dbを作成する
# すでに存在していれば、それにアスセスする。
dbName = 'stock.db'
filepath = "DBのパス" + dbName
conn = sqlite3.connect(filepath)
sqlSentence = "SELECT * FROM StockData" + " WHERE Code IS " + str(stock)
readData = pd.read_sql(sqlSentence, conn)
# データベースへのコネクションを閉じる。(必須)
conn.close()
return readData
DB操作のためのモジュール。
DBのパスは各自の環境に合わせて変更。
sqlite3だと適当なフォルダを指定すればいいだけなので、”DBのパス”にDBを作りたいパスを指定すればいいだけ。
よくわからないが、DBアクセスの際はDB作成と同じコマンドらしく、DB作成のモジュールが実質ゴミとなっている。
ソースコードが長くて見づらくなったので、後編に続く。
コメントを残す