147 lines
5.3 KiB
Python
147 lines
5.3 KiB
Python
from datetime import datetime, timedelta
|
|
from pytz import timezone
|
|
|
|
from Historical_Data_Accessor import HistoricalDataAccessor
|
|
from Model_Simulation import ModelSimulation
|
|
|
|
from simulation.strategies.Random_Strategy import RandomStrategy
|
|
|
|
EASTERN = timezone('US/Eastern')
|
|
|
|
|
|
def fft_custom():
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from sklearn.linear_model import LinearRegression
|
|
symbol = "TSLA"
|
|
num_frequencies = 50
|
|
num_moving_average = 5
|
|
|
|
datetime_start = EASTERN.localize(datetime(year=2023, month=1, day=6, hour=10, minute=30))
|
|
datetime_end = EASTERN.localize(datetime(year=2024, month=1, day=1, hour=10, minute=30))
|
|
|
|
data_accessor = HistoricalDataAccessor()
|
|
|
|
proj = {
|
|
"_id": 0,
|
|
"Close": 1
|
|
}
|
|
extra_query = {
|
|
"Verbose_Phase": "Regular hours"
|
|
}
|
|
# Example: Retrieving only_close data from your data source
|
|
datapoints = data_accessor.get_range_datapoints(symbol=symbol, datetime_first=datetime_start, datetime_last=datetime_end, proj=proj, extra_query=extra_query)
|
|
only_close = np.array([datapoint["Close"] for datapoint in datapoints])
|
|
real_only_close = only_close
|
|
|
|
|
|
# Calculate daily gain percentage
|
|
period = int(60 * 6.5*5) # Assuming period in minutes
|
|
daily_gain = []
|
|
for i in range(len(only_close)):
|
|
if i >= period:
|
|
gain = (only_close[i] - only_close[i - period]) / only_close[i - period] * 100.0
|
|
daily_gain.append(gain)
|
|
else:
|
|
daily_gain.append(0) # For the first `period` elements, set to NaN or handle as needed
|
|
|
|
# Convert daily_gain to numpy array
|
|
only_close = np.array(daily_gain)
|
|
|
|
# Plotting daily gain
|
|
plt.figure(figsize=(10, 6))
|
|
plt.plot(only_close, label='Daily Gain (%)')
|
|
plt.xlabel('Time')
|
|
plt.ylabel('Daily Gain (%)')
|
|
plt.title(f'Daily Percentage Gain of {symbol} Close Prices')
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.savefig('daily_gain_plot.png')
|
|
plt.show()
|
|
|
|
|
|
# Applying Simple Moving Average (SMA)
|
|
window_size = num_moving_average
|
|
sma = np.convolve(only_close, np.ones(window_size)/window_size, mode='valid')
|
|
|
|
# Split SMA data into training (80%) and testing (20%)
|
|
train_size = int(0.8 * len(sma))
|
|
sma_train = sma[:train_size]
|
|
sma_test = sma[train_size:]
|
|
|
|
# Step 3: Perform the FFT on training data
|
|
fft_result_train = np.fft.fft(sma_train)
|
|
frequencies = np.fft.fftfreq(len(sma_train))
|
|
|
|
magnitude_train = np.abs(fft_result_train)
|
|
peak_frequency = frequencies[np.argmax(magnitude_train)]
|
|
periods_train = 1 / frequencies
|
|
periods_train = periods_train / (60 * 6.5)
|
|
|
|
# Filter periods and magnitudes
|
|
mask_train = (periods_train > 0) & (periods_train < 365)
|
|
filtered_periods_train = periods_train[mask_train]
|
|
filtered_frequencies_train = frequencies[mask_train]
|
|
filtered_magnitude_train = magnitude_train[mask_train]
|
|
|
|
# Identify the top 10 frequencies by magnitude
|
|
top_indices_train = np.argsort(filtered_magnitude_train)[-1*(num_frequencies):] # Indices of the top 10 magnitudes
|
|
top_periods_train = filtered_periods_train[top_indices_train]
|
|
|
|
# Calculate phase shift for each top period
|
|
phase_shifts_train = np.zeros(len(top_periods_train))
|
|
for i, period in enumerate(top_periods_train):
|
|
freq = 1 / (period * 60 * 6.5)
|
|
index = np.argmin(np.abs(frequencies - freq))
|
|
phase_shifts_train[i] = np.angle(fft_result_train[index])
|
|
|
|
# Construct features for linear regression on training data
|
|
X_train = np.zeros((len(sma_train), len(top_periods_train)))
|
|
for i, period in enumerate(top_periods_train):
|
|
freq = 1 / (period * 60 * 6.5)
|
|
X_train[:, i] = np.cos(2 * np.pi * freq * np.arange(len(sma_train)) + phase_shifts_train[i])
|
|
|
|
# Perform linear regression on training data
|
|
model = LinearRegression()
|
|
model.fit(X_train, sma_train)
|
|
|
|
# Predict using the model on entire dataset
|
|
X_all = np.zeros((len(sma), len(top_periods_train)))
|
|
for i, period in enumerate(top_periods_train):
|
|
freq = 1 / (period * 60 * 6.5)
|
|
X_all[:, i] = np.cos(2 * np.pi * freq * np.arange(len(sma)) + phase_shifts_train[i])
|
|
|
|
predicted_all = model.predict(X_all)
|
|
|
|
# Plot original only_close and regression line
|
|
plt.figure(figsize=(10, 6))
|
|
plt.plot(sma, label='SMA Daily Gain of Original Data')
|
|
plt.plot(predicted_all, label='Linear Regression on SMA Daily Gain', linestyle='--', linewidth=2)
|
|
plt.plot(real_only_close, label='Real Only Close', linestyle='-.')
|
|
|
|
# Vertical line at 80% cutoff
|
|
plt.axvline(train_size, color='r', linestyle='--', label='80% Cutoff')
|
|
|
|
plt.xlabel('Time')
|
|
plt.ylabel('SMA Close Price')
|
|
plt.title(f'SMA of {symbol} Close Prices with Linear Regression')
|
|
plt.legend()
|
|
plt.savefig('sma_and_regression.png')
|
|
plt.close()
|
|
|
|
def main():
|
|
fft_custom()
|
|
return
|
|
|
|
datetime_start = EASTERN.localize(datetime(year=2016, month=1, day=6, hour=10, minute=30))
|
|
datetime_end = EASTERN.localize(datetime(year=2023, month=1, day=6, hour=10, minute=30))
|
|
|
|
random_strategy = RandomStrategy(symbol="AMD")
|
|
|
|
model_simulation = ModelSimulation(symbol="AMD", datetime_start=datetime_start, datetime_end=datetime_end, datetime_delta=timedelta(minutes=1), strategy=random_strategy)
|
|
if model_simulation.status != 0:
|
|
print("uh oh")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |