Stock-Algorithm-Back-Tester/strategies/deprecated/sma_offset_trigger/SMA Offset Trigger Rewrite.py
2024-08-14 14:10:18 -05:00

201 lines
9.5 KiB
Python

import datetime as dt
import plotly.express as plt
import plotly.subplots as subplt
import plotly.graph_objects as goplt
import numpy as np
import pandas as pd
import math
import time
from multiprocessing import Pool
from functools import partial
import itertools
import statistics as stat
def read_ticker_data(ticker_settings={}):
'''Reads in the ticker data from a csv file'''
data = {
"Datetime": [],
"Epoch": [],
"Open": [],
"High": [],
"Low": [],
"Close": [],
"Points": 0
}
file = open(f"..\..\Raw Data\{ticker_settings['Ticker']} {ticker_settings['Period']} {ticker_settings['Interval']} Data.csv")
file_list = file.readlines()
i = 0
header = True
for line in file_list:
if header:
header = False
else:
line_elements = line.strip().split(",") # Remove the newline at the end and split
data["Datetime"].append(line_elements[0])
data["Epoch"].append(int(dt.datetime.strptime(line_elements[0], "%Y-%m-%d %H:%M:%S").timestamp()))
data["Open"].append(float(line_elements[1]))
data["High"].append(float(line_elements[2]))
data["Low"].append(float(line_elements[3]))
data["Close"].append(float(line_elements[4]))
i += 1
file.close()
data["Points"] = len(data["Datetime"])
return data
def generate_sma(sma_settings={}, raw_data=[], reference_data_container=[]):
'''Generates the SMA of the raw data provided. Appends it to a data array passed by reference.'''
for i in range(sma_settings["Start Index"], sma_settings["Stop Index"]):
if (sma_settings["Length"] != 0):
reference_data_container.append(round(sum(raw_data[i-sma_settings["Length"]:i]) / sma_settings["Length"], 3))
else:
reference_data_container.append(round(raw_data[i], 3))
def calculate_volatility(volatility_settings={}, raw_data=[], reference_data_container=[]):
'''Calculate the volatility of a given range of values'''
for i in range(volatility_settings["Start Index"], volatility_settings["Stop Index"]):
if (volatility_settings["Method"] == "stdev"):
reference_data_container.append(round(stat.pstdev(raw_data[i-volatility_settings["Length"]:i]) / raw_data[i] * 100, 3))
def generate_offset(offset_settings={}, raw_data=[], reference_data_container=[]):
'''Create an offset data array of a given dataset'''
for i in range(offset_settings["Start Index"], offset_settings["Stop Index"]):
reference_data_container.append(round(raw_data[i]*(1+offset_settings["Percent"]/100), 3))
def detect_crossover(crossover_settings={}, raw_data=[], reference_data_container=[]):
'''Look for indexes where two data arrays crossed over each other.
1 for first crosses over second, -1 for second crossing over first.'''
above = raw_data[0][crossover_settings["Start Index"]-1] > raw_data[1][crossover_settings["Start Index"]-1] # Get the initial state
for i in range(crossover_settings["Start Index"], crossover_settings["Stop Index"]):
if (raw_data[1][i] > raw_data[0][i] and above): # First crosses below second
reference_data_container.append(-1)
elif (raw_data[0][i] > raw_data[1][i] and not above): # First crosses above second
reference_data_container.append(1)
else:
reference_data_container.append(0)
#def detect_trend_reversal(reversal_settings={}, raw_data=[], reference_data_container=[]):
#def generate_action_events(action_settings={}, raw_data=[], reference_data_container=[]):
#def walking_forward_trader(walking_trader_settings={}, raw_data=[]):
#def single_model_optimization(model_optimization_settings={}, raw_data=[]):
#def rollover_trading_sim(rollover_settings={}, raw_data=[]):
def main(ticker_settings={}, parameter_ranges={}):
raw_ticker_data = read_ticker_data(ticker_settings)
# Let's generate the dictionary that will contain everything we calculated to save computation
previously_calculated_data = {}
# Get the SMA stuff
previously_calculated_data["SMA"] = {}
for length in range(parameter_ranges["Short SMA"][0], parameter_ranges["Long SMA"][1] + parameter_ranges["Short SMA"][2], parameter_ranges["Short SMA"][2]):
previously_calculated_data["SMA"][length] = {
"Close": list(itertools.repeat(0, length)) # SMA using length 5 of Close
}
# Get the volatility stuff
previously_calculated_data["Volatility"] = {}
for length in range(parameter_ranges["Volatility Length"][0], parameter_ranges["Volatility Length"][1] + parameter_ranges["Volatility Length"][2], parameter_ranges["Volatility Length"][2]):
previously_calculated_data["Volatility"][length] = {
"Close": list(itertools.repeat(0, length)) # SMA using length 5 of Close
}
# Get the percent offset stuff
total_percents = int((parameter_ranges["Percent Offset"][1] - parameter_ranges["Percent Offset"][0]) / parameter_ranges["Percent Offset"][2])
percent = parameter_ranges["Percent Offset"][0]
previously_calculated_data["Offset"] = {}
for i in range(total_percents):
previously_calculated_data["Offset"][percent] = {"SMA": {}}
for length in range(parameter_ranges["Short SMA"][0], parameter_ranges["Long SMA"][1] + parameter_ranges["Short SMA"][2], parameter_ranges["Short SMA"][2]):
previously_calculated_data["Offset"][percent]["SMA"][length] = {
"Close": list(itertools.repeat(0, length))
}
percent += round(parameter_ranges["Percent Offset"][2], 3)
# Get the volatility + pecent offset stuff
total_vol_scalars = int((parameter_ranges["Volatility Scalar"][1] - parameter_ranges["Volatility Scalar"][0]) / parameter_ranges["Volatility Scalar"][2])
vol_scalar = parameter_ranges["Volatility Scalar"][0]
previously_calculated_data["Volatility+Percent Offset"] = {}
for i in range(total_vol_scalars):
previously_calculated_data["Volatility+Percent Offset"][vol_scalar] = {"Volatility": {}}
for vol_length in range(parameter_ranges["Volatility Length"][0], parameter_ranges["Volatility Length"][1] + parameter_ranges["Volatility Length"][2], parameter_ranges["Volatility Length"][2]):
previously_calculated_data["Volatility+Percent Offset"][vol_scalar]["Volatility"][vol_length] = {"Offset": {}}
total_percents = int((parameter_ranges["Percent Offset"][1] - parameter_ranges["Percent Offset"][0]) / parameter_ranges["Percent Offset"][2])
percent = parameter_ranges["Percent Offset"][0]
for j in range(total_percents):
previously_calculated_data["Volatility+Percent Offset"][vol_scalar]["Volatility"][vol_length]["Offset"][percent] = {"SMA": {}}
for sma_length in range(parameter_ranges["Short SMA"][0], parameter_ranges["Long SMA"][1] + parameter_ranges["Short SMA"][2], parameter_ranges["Short SMA"][2]):
previously_calculated_data["Volatility+Percent Offset"][vol_scalar]["Volatility"][vol_length]["Offset"][percent]["SMA"][sma_length] = {
"Close": list(itertools.repeat(0, max(vol_length, sma_length)))
}
percent += round(parameter_ranges["Percent Offset"][2], 3)
vol_scalar += parameter_ranges["Volatility Scalar"][2]
sma_settings = {
"Length": 5,
"Start Index": 5, # Inclusive
"Stop Index": 8000 # Non inclusive
}
generate_sma(sma_settings, raw_ticker_data["Close"], previously_calculated_data["SMA"][sma_settings["Length"]]["Close"])
sma_settings = {
"Length": 3,
"Start Index": 3, # Inclusive
"Stop Index": 8000 # Non inclusive
}
generate_sma(sma_settings, raw_ticker_data["Close"], previously_calculated_data["SMA"][sma_settings["Length"]]["Close"])
volatility_settings = {
"Length": 30,
"Start Index": 30,
"Stop Index": 8000,
"Method": "stdev" # stdev, mae (mean absolute error), how much the last period covered the range of a larger period
}
calculate_volatility(volatility_settings, raw_ticker_data["Close"], previously_calculated_data["Volatility"][volatility_settings["Length"]]["Close"])
offset_settings = {
"Percent": 3,
"Start Index": 5,
"Stop Index": 8000,
}
print(previously_calculated_data["Offset"].keys())
generate_offset(offset_settings, previously_calculated_data["SMA"][5]["Close"], previously_calculated_data["Offset"][offset_settings["Percent"]]["SMA"][5]["Close"])
crossover_settings = {
"Start Index": 5,
"Stop Index": 8000
}
sma3_crosses_sma_5 = list(itertools.repeat(0, crossover_settings["Start Index"]))
detect_crossover(crossover_settings, [previously_calculated_data["SMA"][3]["Close"], previously_calculated_data["SMA"][5]["Close"]], sma3_crosses_sma_5)
print(sma3_crosses_sma_5)
ticker_settings = {
"Ticker": "RKLB",
"Period": "720d",
"Interval": "15m"
}
parameter_ranges = {
"Short SMA": [1, 20, 1],
"Long SMA": [2, 25, 1],
"Percent Offset": [0.25, 5, 0.25],
"Volatility Length": [30, 3000, 30],
"Volatility Scalar": [0, 5, 0.25]
}
if __name__ == "__main__":
start_time = time.time()
main(ticker_settings, parameter_ranges)
print("--- %s seconds ---" % (time.time() - start_time))