Stock-Algorithm-Back-Tester/Historical_Data_Accessor.py
2024-08-14 14:10:18 -05:00

181 lines
7.3 KiB
Python

'''
This file describes how the brokerage simulation and algorithm models can access data from MongoDB's historical data records
'''
from pymongo import MongoClient
from pymongo.collection import Collection
from datetime import datetime
from pytz import timezone
from ibapi.contract import Contract
from Common import TickData
MONGODB_IP = "10.0.0.34"
MONGODB_PORT = 27017
MONGODB_HISTORICAL_DATA_DB = "Historical_Stock_Data"
MONGODB_STOCK_INFORMATION_DB = "Stock_Information"
MONGODB_GENERAL_INFORMATION_COLLECTION = "General_Information"
'''
TODO
1.
'''
class HistoricalDataAccessor():
'''The accessor houses the mongo connection and provides a convenient way to access a data for any stock'''
def __init__(self):
self._status = 0
if self.connect_to_mongo() != 0:
self._status = 1
@property
def status(self) -> int:
return self._status
def connect_to_mongo(self) -> int:
'''Connects to mongodb as specified with ip and port'''
try:
self._client = MongoClient(MONGODB_IP, MONGODB_PORT)
return 0
except Exception as exc:
print(f"Exception occurred trying to connect to MongoDB: {exc}")
return 1
def get_stock_information(self, symbol: str) -> dict:
'''Gets the stock information for a specific stock
Returns dict of stock information on success, None if not'''
try:
self.information_db = self._client[MONGODB_STOCK_INFORMATION_DB]
if MONGODB_GENERAL_INFORMATION_COLLECTION not in self.information_db.list_collection_names():
print(f"{MONGODB_GENERAL_INFORMATION_COLLECTION} not in MongoDB {MONGODB_STOCK_INFORMATION_DB} database")
return None
else:
self.information_collection = self.information_db[MONGODB_GENERAL_INFORMATION_COLLECTION]
query = {"Symbol": symbol}
projection = {
"_id": 1, # Include every field unless specified otherwise
"Historical_Stock_Data.Verification": 0 # Do not include this field
}
stock_information = self.information_collection.find_one(query, projection)
return stock_information
except Exception as exc:
print(f"Exception occurred trying to get stock information for {symbol}: {exc}")
return None
def connect_to_historical_data_collection(self, symbol:str) -> Collection:
'''Connects to the collection for a specific stock.
Returns Collection on success, None if not'''
try:
historical_db = self._client[MONGODB_HISTORICAL_DATA_DB]
if symbol not in historical_db.list_collection_names():
print(f"{symbol} not in MongoDB {MONGODB_HISTORICAL_DATA_DB} database")
return None
else:
collection = historical_db[symbol]
return collection
except Exception as exc:
print(f"Exception occurred trying to connect to historical data collection for {symbol}: {exc}")
return None
def get_single_datapoint(self, symbol: str, datetime_to_get: datetime) -> dict:
'''Gets a single datapoint with the specific datetime.
Returns datapoint document if successful. If not, returns None'''
try:
historical_collection = self.connect_to_historical_data_collection(symbol)
query = {
"Date": datetime_to_get
}
doc = historical_collection.find_one(query)
return doc
except Exception as exc:
print(f"Exception occurred trying to get single datapoint from {MONGODB_HISTORICAL_DATA_DB}.{symbol} with Date = {datetime_to_get}: {exc}")
return None
def get_range_datapoints(self, symbol: str, datetime_first: datetime, datetime_last: datetime, proj:dict={}, extra_query:dict={}) -> list[dict]:
'''Gets a list of documents from first to last inclusive.
Returns list of documents if successful. If not, returns None'''
try:
historical_collection = self.connect_to_historical_data_collection(symbol)
query = {
"Date": {"$gte": datetime_first, "$lte": datetime_last},
}
query.update(extra_query)
if proj != {}:
docs = list(historical_collection.find(query, proj))
else:
docs = list(historical_collection.find(query))
return docs
except Exception as exc:
print(f"Exception occurred trying to get range of datapoins from {MONGODB_HISTORICAL_DATA_DB}.{symbol} with dates between {datetime_first} and {datetime_last}: {exc}")
return None
def is_datapoint_available(self, symbol: str, datetime_to_check: datetime) -> bool:
'''Checks if a datapoint is available for the datetime provided'''
doc = self.get_single_datapoint(symbol, datetime_to_check)
if doc is not None:
return True
else:
return False
def convert_datapoint_to_tick_data(self, symbol: str, datapoint: dict) -> TickData:
tick_data = TickData(
symbol = symbol,
open_datetime = datapoint.get("Date", None),
open = datapoint.get("Open", 0),
high = datapoint.get("High", 0),
low = datapoint.get("Low", 0),
close = datapoint.get("Close", 0),
volume = datapoint.get("Volume", 0),
count = datapoint.get("Count", 0),
source = datapoint.get("Source", None)
)
return tick_data
def get_single_tick_data(self, symbol: str, datetime_to_get: datetime) -> TickData:
datapoint = self.get_single_datapoint(symbol, datetime_to_get)
# Convert the dictionaty to the TickData
tick_data = self.convert_datapoint_to_tick_data(symbol, datapoint)
return tick_data
def get_range_tick_data(self, symbol: str, datetime_first: datetime, datetime_last: datetime, proj:dict={}, extra_query:dict={}) -> list[TickData]:
datapoints = self.get_range_datapoints(symbol, datetime_first, datetime_last, proj, extra_query)
tick_data_list: list[TickData] = []
for datapoint in datapoints:
tick_data = self.convert_datapoint_to_tick_data(symbol, datapoint)
tick_data_list.append(tick_data)
return tick_data_list
def main():
accessor = HistoricalDataAccessor()
# print(f"Status = {accessor.get_status()}")
# print(accessor.stock_information)
symbol = "AMD"
EASTERN = timezone('US/Eastern')
datetime_first = EASTERN.localize(datetime(year=2020, month=1, day=8, hour=12, minute=30))
# doc = accessor.get_single_datapoint(symbol, datetime_first)
# print(doc)
tick_data = accessor.get_single_tick_data(symbol, datetime_first)
print(tick_data)
# datetime_last = EASTERN.localize(datetime(year=2020, month=1, day=8, hour=13, minute=30))
# docs = accessor.get_range_datapoints(datetime_first, datetime_last)
# print(docs)
# datetime_last = EASTERN.localize(datetime(year=2020, month=1, day=8, hour=13, minute=30))
# tick_datas = accessor.get_range_tick_data(symbol, datetime_first, datetime_last)
# for i in tick_datas:
# print(i)
if __name__ == "__main__":
main()