181 lines
7.3 KiB
Python
181 lines
7.3 KiB
Python
'''
|
|
This file describes how the brokerage simulation and algorithm models can access data from MongoDB's historical data records
|
|
'''
|
|
from pymongo import MongoClient
|
|
from pymongo.collection import Collection
|
|
from datetime import datetime
|
|
from pytz import timezone
|
|
|
|
from ibapi.contract import Contract
|
|
|
|
from Common import TickData
|
|
|
|
|
|
MONGODB_IP = "10.0.0.34"
|
|
MONGODB_PORT = 27017
|
|
MONGODB_HISTORICAL_DATA_DB = "Historical_Stock_Data"
|
|
MONGODB_STOCK_INFORMATION_DB = "Stock_Information"
|
|
MONGODB_GENERAL_INFORMATION_COLLECTION = "General_Information"
|
|
|
|
'''
|
|
TODO
|
|
1.
|
|
'''
|
|
class HistoricalDataAccessor():
|
|
'''The accessor houses the mongo connection and provides a convenient way to access a data for any stock'''
|
|
def __init__(self):
|
|
self._status = 0
|
|
if self.connect_to_mongo() != 0:
|
|
self._status = 1
|
|
|
|
@property
|
|
def status(self) -> int:
|
|
return self._status
|
|
|
|
def connect_to_mongo(self) -> int:
|
|
'''Connects to mongodb as specified with ip and port'''
|
|
try:
|
|
self._client = MongoClient(MONGODB_IP, MONGODB_PORT)
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"Exception occurred trying to connect to MongoDB: {exc}")
|
|
return 1
|
|
|
|
def get_stock_information(self, symbol: str) -> dict:
|
|
'''Gets the stock information for a specific stock
|
|
Returns dict of stock information on success, None if not'''
|
|
try:
|
|
self.information_db = self._client[MONGODB_STOCK_INFORMATION_DB]
|
|
|
|
if MONGODB_GENERAL_INFORMATION_COLLECTION not in self.information_db.list_collection_names():
|
|
print(f"{MONGODB_GENERAL_INFORMATION_COLLECTION} not in MongoDB {MONGODB_STOCK_INFORMATION_DB} database")
|
|
return None
|
|
else:
|
|
self.information_collection = self.information_db[MONGODB_GENERAL_INFORMATION_COLLECTION]
|
|
query = {"Symbol": symbol}
|
|
projection = {
|
|
"_id": 1, # Include every field unless specified otherwise
|
|
"Historical_Stock_Data.Verification": 0 # Do not include this field
|
|
}
|
|
stock_information = self.information_collection.find_one(query, projection)
|
|
return stock_information
|
|
except Exception as exc:
|
|
print(f"Exception occurred trying to get stock information for {symbol}: {exc}")
|
|
return None
|
|
|
|
def connect_to_historical_data_collection(self, symbol:str) -> Collection:
|
|
'''Connects to the collection for a specific stock.
|
|
Returns Collection on success, None if not'''
|
|
try:
|
|
historical_db = self._client[MONGODB_HISTORICAL_DATA_DB]
|
|
|
|
if symbol not in historical_db.list_collection_names():
|
|
print(f"{symbol} not in MongoDB {MONGODB_HISTORICAL_DATA_DB} database")
|
|
return None
|
|
else:
|
|
collection = historical_db[symbol]
|
|
return collection
|
|
except Exception as exc:
|
|
print(f"Exception occurred trying to connect to historical data collection for {symbol}: {exc}")
|
|
return None
|
|
|
|
def get_single_datapoint(self, symbol: str, datetime_to_get: datetime) -> dict:
|
|
'''Gets a single datapoint with the specific datetime.
|
|
Returns datapoint document if successful. If not, returns None'''
|
|
try:
|
|
historical_collection = self.connect_to_historical_data_collection(symbol)
|
|
query = {
|
|
"Date": datetime_to_get
|
|
}
|
|
doc = historical_collection.find_one(query)
|
|
return doc
|
|
except Exception as exc:
|
|
print(f"Exception occurred trying to get single datapoint from {MONGODB_HISTORICAL_DATA_DB}.{symbol} with Date = {datetime_to_get}: {exc}")
|
|
return None
|
|
|
|
def get_range_datapoints(self, symbol: str, datetime_first: datetime, datetime_last: datetime, proj:dict={}, extra_query:dict={}) -> list[dict]:
|
|
'''Gets a list of documents from first to last inclusive.
|
|
Returns list of documents if successful. If not, returns None'''
|
|
try:
|
|
historical_collection = self.connect_to_historical_data_collection(symbol)
|
|
query = {
|
|
"Date": {"$gte": datetime_first, "$lte": datetime_last},
|
|
}
|
|
query.update(extra_query)
|
|
if proj != {}:
|
|
docs = list(historical_collection.find(query, proj))
|
|
else:
|
|
docs = list(historical_collection.find(query))
|
|
return docs
|
|
except Exception as exc:
|
|
print(f"Exception occurred trying to get range of datapoins from {MONGODB_HISTORICAL_DATA_DB}.{symbol} with dates between {datetime_first} and {datetime_last}: {exc}")
|
|
return None
|
|
|
|
def is_datapoint_available(self, symbol: str, datetime_to_check: datetime) -> bool:
|
|
'''Checks if a datapoint is available for the datetime provided'''
|
|
doc = self.get_single_datapoint(symbol, datetime_to_check)
|
|
if doc is not None:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def convert_datapoint_to_tick_data(self, symbol: str, datapoint: dict) -> TickData:
|
|
tick_data = TickData(
|
|
symbol = symbol,
|
|
open_datetime = datapoint.get("Date", None),
|
|
open = datapoint.get("Open", 0),
|
|
high = datapoint.get("High", 0),
|
|
low = datapoint.get("Low", 0),
|
|
close = datapoint.get("Close", 0),
|
|
volume = datapoint.get("Volume", 0),
|
|
count = datapoint.get("Count", 0),
|
|
source = datapoint.get("Source", None)
|
|
)
|
|
|
|
return tick_data
|
|
|
|
def get_single_tick_data(self, symbol: str, datetime_to_get: datetime) -> TickData:
|
|
datapoint = self.get_single_datapoint(symbol, datetime_to_get)
|
|
|
|
# Convert the dictionaty to the TickData
|
|
tick_data = self.convert_datapoint_to_tick_data(symbol, datapoint)
|
|
|
|
return tick_data
|
|
|
|
def get_range_tick_data(self, symbol: str, datetime_first: datetime, datetime_last: datetime, proj:dict={}, extra_query:dict={}) -> list[TickData]:
|
|
datapoints = self.get_range_datapoints(symbol, datetime_first, datetime_last, proj, extra_query)
|
|
|
|
tick_data_list: list[TickData] = []
|
|
for datapoint in datapoints:
|
|
tick_data = self.convert_datapoint_to_tick_data(symbol, datapoint)
|
|
tick_data_list.append(tick_data)
|
|
|
|
return tick_data_list
|
|
|
|
|
|
def main():
|
|
accessor = HistoricalDataAccessor()
|
|
# print(f"Status = {accessor.get_status()}")
|
|
# print(accessor.stock_information)
|
|
symbol = "AMD"
|
|
|
|
EASTERN = timezone('US/Eastern')
|
|
datetime_first = EASTERN.localize(datetime(year=2020, month=1, day=8, hour=12, minute=30))
|
|
|
|
# doc = accessor.get_single_datapoint(symbol, datetime_first)
|
|
# print(doc)
|
|
|
|
tick_data = accessor.get_single_tick_data(symbol, datetime_first)
|
|
print(tick_data)
|
|
|
|
# datetime_last = EASTERN.localize(datetime(year=2020, month=1, day=8, hour=13, minute=30))
|
|
# docs = accessor.get_range_datapoints(datetime_first, datetime_last)
|
|
# print(docs)
|
|
|
|
# datetime_last = EASTERN.localize(datetime(year=2020, month=1, day=8, hour=13, minute=30))
|
|
# tick_datas = accessor.get_range_tick_data(symbol, datetime_first, datetime_last)
|
|
# for i in tick_datas:
|
|
# print(i)
|
|
|
|
if __name__ == "__main__":
|
|
main() |