KABU+のデータを自動取得してDBに追加する(DB系モジュール編)

KABU+という株式データ配信サービスがあり、加入して日々検証などを行っている。
今までは取得したCSVを読み込んで処理をしていたが、連続で大量のファイルにアクセスするので、いつかはDB化して処理を軽くできないかと考えていた。
この度重い腰を上げて挑戦したところDB化と日々のデータ取得も併せて自動化できたので忘備録として記す。

1.概要というか背景というか

KABU+というサブスクサービスがある。
月額1500円くらい?で日々株式の各種データが配信されるというもの。
このデータを用いて(まったく勝てていないが)トレードの検証などをしている。
ヤフーなどからスクレイピングで毎日取得していけば金はかからないが、過去データを取るのが面倒なのでKABU+を使用している形。
昔は数年分データがあったが今は1年分しか過去データがないので、今からデータを蓄積していきたいとかじゃないのであれば別サービスのほうがいいかもしれない。

サービス自体は配信元のCSVをエクスプローラのネットワークドライブに登録とかして、参照しに行く形。
検証などで使用するなら別途ローカルにダウンロードしておいたほうが扱いやすいので、基本はダウンロードして運用することになる。
ただダウンロードしただけだとローカルに大量にCSVファイルがある状況のため、いろいろ扱いにくい。
そこで今回は
①初期データとして公式からデータをローカルにダウンロード
②それをDBに登録
③日々の更新データを取得してDBに登録まで自動化
を行う。

2.環境とか構成とかソースコードとか

環境は24h365d動いてほしいので年初に導入したNAS。の上に建てたVM(Linux)で諸々処理する。
なお書いてる人はプログラム何それおいしいのレベルのため、よくわからないけど動くからおっけーの精神で書いてるとだけ記載しておく。

構成は下記。

--
 |--database_module
 |    |
 |    |--kabu_plus_download.py
 |    |--kabu_plus_read.py
 |    |--operationTool.py
 |
 |
 |--exe_program
 |   |
 |   |--download_StockData_Daily.py
 |   |--first_insert_DB.py

ファイルが無駄に多いのは無計画にモジュールを増やしていったからであり、一つにまとめられるならそうしたほうがいい。

各プログラムは下記のような感じ。
またlocalDataなるものをインポートしているが、これは当環境のフォルダパス等をまとめたもの。
なのでプラグラム中のA.XXXXXも適宜解読して自身の環境に合わせてもらえればと思う。
またデータの自動取得及びDB登録に無関係の関数も多々あるが、整理するのが面倒だったので残してあるだけ。

sys.path.append(os.path.join(os.path.dirname(__file__), '../../localData'))
import localData as A

kabu_plus_download.py

#https://nerimplo.hatenablog.com/entry/2018/07/06/000000 を参考
import requests
from requests.auth import HTTPBasicAuth
import csv

import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../localData')) #ここは削除でおk
import localData as A #ここも削除でおk

def retrieve_csv_file(url):
    id = 'KABU+のID' 
    pw = 'KABU+のPW'

    res = requests.get(url, auth=HTTPBasicAuth(id, pw))
    data = res.content.decode('shift-jis')
    return data

def to_csv(data, file=None):
    if file:
        with open(file, 'w', encoding='shift-jis') as f:
            writer = csv.writer(f, delimiter=',',  quotechar='"',quoting=csv.QUOTE_NONNUMERIC)
            reader = csv.reader(data.splitlines())
            for row in reader:
                writer.writerow(row)

def downloadNewPrices2():
    URL = 'https://csvex.com/kabu.plus/csv/japan-all-stock-prices-2/daily/japan-all-stock-prices-2.csv'
    data = retrieve_csv_file(URL)
    filePath = A.stockPrices2DailyPath + "/japan-all-stock-prices-2.csv" #ファイルパスは各環境に修正が必要
    to_csv(data, filePath)

def downloadNewStockData():
    URL = 'https://csvex.com/kabu.plus/csv/japan-all-stock-data/daily/japan-all-stock-data.csv'
    data = retrieve_csv_file(URL)
    filePath = A.stockDataDailyPath + "/japan-all-stock-data.csv" #ファイルパスは各環境に修正が必要
    to_csv(data, filePath)

このファイルはその名の通り、japan-all-stock-prices-2.csvとjapan-all-stock-data.csvをダウンロードするモジュール。
KABU+のIDとPW、ファイルパスは各自のものを入れればいい。

kabu_plus_read.py

import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../localData'))
import localData as A

import pandas as pd
import numpy as np

#YYYYMMDDは文字列。関数を呼び出す前に呼び出し元で文字列化する
def getPrices2(YYYYMMDD):
    fileName = "japan-all-stock-prices-2_" + str(YYYYMMDD) + ".csv"
    filePath = A.stockPrices2DailyPath + "/" + fileName
    #pricesデータを取得
    readData = pd.read_csv(filePath,encoding="SHIFT-JIS") 

    #仕様変更に伴い日時列がある場合、日付列へ変更
    columnsList = readData.columns.tolist()
    dateStr = "日時"
    if str(dateStr) in str(columnsList):
        readData = readData.rename(columns={'日時':'日付'})

    #表記を統一するために日付列はすべて引数のYYYYMMDDに置換
    readData["日付"] = str(YYYYMMDD)

    return readData

#当日のデータを取得
def getNewPrices2():
    fileName = "japan-all-stock-prices-2.csv"
    filePath = A.stockPrices2DailyPath + "/" + fileName
    #pricesデータを取得
    readData = pd.read_csv(filePath,encoding="SHIFT-JIS") 

    #仕様変更に伴い日時列がある場合、日付列へ変更
    columnsList = readData.columns.tolist()
    dateStr = "日時"
    if str(dateStr) in str(columnsList):
        readData = readData.rename(columns={'日時':'日付'})

    return readData

#YYYYMMDDは文字列。関数を呼び出す前に呼び出し元で文字列化する
def getStockData(YYYYMMDD):
    fileName = "japan-all-stock-data_" + str(YYYYMMDD) + ".csv"
    filePath = A.stockDataDailyPath + "/" + fileName
    #pricesデータを取得
    readData = pd.read_csv(filePath,encoding="SHIFT-JIS") 

    #仕様変更に伴い配当利回り列がある場合、配当利回り(予想)列へ変更
    columnsList = readData.columns.tolist()
    dateStr = "配当利回り"
    if str(dateStr) in str(columnsList):
        readData = readData.rename(columns={'配当利回り':'配当利回り(予想)'})
    
    dateStr = "1株配当"
    if str(dateStr) in str(columnsList):
        readData = readData.rename(columns={'1株配当':'1株配当(予想)'})

    return readData

#当日のデータを取得
def getNewStockData():
    fileName = "japan-all-stock-data.csv"
    filePath = A.stockDataDailyPath + "/" + fileName
    #pricesデータを取得
    readData = pd.read_csv(filePath,encoding="SHIFT-JIS") 

    #仕様変更に伴い配当利回り列がある場合、配当利回り(予想)列へ変更
    columnsList = readData.columns.tolist()
    dateStr = "配当利回り"
    if str(dateStr) in str(columnsList):
        readData = readData.rename(columns={'配当利回り':'配当利回り(予想)'})
    
    dateStr = "1株配当"
    if str(dateStr) in str(columnsList):
        readData = readData.rename(columns={'1株配当':'1株配当(予想)'})

    return readData

#prices2とstockDataをマージしてマッピング及びデータ型の整理
def mergePrices2AndStockData(prices2,stockData):
    mergeData = pd.merge(prices2,stockData)

    mapping = {'SC': 'Code',
           '名称': 'Name',
           '市場': 'Market',
           '業種': 'Industry',
           '日付': 'Timestamp',
           '株価': 'Price',
           '前日比': 'Change',
           '前日比(%)': 'ChangeInPercent',
           '前日終値': 'PreviousClosePx',
           '始値': 'Open',
           '高値': 'High',
           '安値': 'Low',
           'VWAP': 'VWAP',
           '出来高': 'Volume',
           '出来高率': 'VolumeInPercent',
           '売買代金(千円)': 'TradingVolume',
           '時価総額(百万円)': 'MarketCap',
           '値幅下限': 'LowerRange',
           '値幅上限': 'UpperRange',
           '高値日付': 'HighDate',
           '年初来高値': 'YTDHigh',
           '年初来高値乖離率': 'DeviationFromYTDHigh',
           '安値日付': 'LowDate',
           '年初来安値': 'YTDLow',
           '年初来安値乖離率': 'DeviationFromYTDLow',
           '発行済株式数': 'StocksIssued',
           '配当利回り(予想)': 'DividendYield',
           '1株配当(予想)': 'DividendPerShare',
           'PER(予想)': 'PER',
           'PBR(実績)': 'PBR',
           'EPS(予想)': 'EPS',
           'BPS(実績)': 'BPS',
           '最低購入額': 'MinimumPurchaseAmount',
           '単元株': 'UnitShare'}

    non_double_precisions = ['Code', 'Name', 'Market', 'Timestamp', 'Industry', 'HighDate', 'LowDate']

    #列名を処理しやすいように英語に変更
    mergeData = mergeData.rename(columns=mapping)

    mergeData = mergeData.replace('-', np.nan)

    for column_type in mapping.values():
        if column_type not in non_double_precisions:
            mergeData[column_type] = list(map(lambda x: float(x), mergeData[column_type]))

    return mergeData

このファイルはその名の通り、japan-all-stock-prices-2.csvとjapan-all-stock-data.csvからデータを読み込むモジュール。
同一日付のデータをそれぞれ読み込み合体させて、カラム名を英語にして返す感じ。

operationTool.py

import sqlite3
import pandas as pd

import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__),'../database_module'))
import kabu_plus_read as k

#初回のみ実施。実質意味なしモジュール
def createDb():
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return 0

#証券データを追加で投入
def insertStockData(YYYYMMDD):
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)

    prices2 = k.getPrices2(YYYYMMDD)
    stockData = k.getStockData(YYYYMMDD)
    mergeData = k.mergePrices2AndStockData(prices2,stockData)

    mergeData.to_sql('StockData',conn,if_exists='append',index=None)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return 0

#証券データを追加で投入
def insertStockData(mergeData):
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)

    mergeData.to_sql('StockData',conn,if_exists='append',index=None)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return 0

#テーブル削除
def deleteTable():
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)
    
    cur = conn.cursor()
    sql = "drop table StockData;"
    cur.execute(sql)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return 0

def readAllTable():
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)
    
    readData = pd.read_sql('SELECT * FROM StockData', conn)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return readData

def searchTableStockAndDate(stock,YYYYMMDD):
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)
    
    sqlSentence = "SELECT * FROM StockData" + " WHERE Code IS " + str(stock) + " AND Timestamp IS " + str(YYYYMMDD)
    readData = pd.read_sql(sqlSentence, conn)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return readData

def searchTableSqlCondition(sqlCondition):
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)
    
    sqlSentence = "SELECT Code,Timestamp FROM StockData " + sqlCondition
    readData = pd.read_sql(sqlSentence, conn)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return readData

def searchTableStock(stock):
    # TEST.dbを作成する
    # すでに存在していれば、それにアスセスする。
    dbName = 'stock.db'
    filepath = "DBのパス" + dbName
    conn = sqlite3.connect(filepath)
    
    sqlSentence = "SELECT * FROM StockData" + " WHERE Code IS " + str(stock)
    readData = pd.read_sql(sqlSentence, conn)

    # データベースへのコネクションを閉じる。(必須)
    conn.close()

    return readData

DB操作のためのモジュール。
DBのパスは各自の環境に合わせて変更。
sqlite3だと適当なフォルダを指定すればいいだけなので、”DBのパス”にDBを作りたいパスを指定すればいいだけ。
よくわからないが、DBアクセスの際はDB作成と同じコマンドらしく、DB作成のモジュールが実質ゴミとなっている。

ソースコードが長くて見づらくなったので、後編に続く。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です