2025-09-12 18:33:32 +01:00

203 lines
7.0 KiB
Python

from decimal import Decimal
from urllib import response
import pandas as pd
import logging
from typing import Union
from datetime import datetime
from sqlalchemy import Float
from ingest import ingest_zip_archive
from quote_db import QuoteDatabase
from quote_contracts import (
# BackendResponse,
FetchQuoteRequest,
QuoteResponse,
FetchQuoteResponse,
FetchBrokersResponse,
ListSymbolsRequest,
ListSymbolsResponse,
ListDatesRequest,
ListDatesResponse,
ListSessionRequest,
ListSessionResponse,
IngestRequest,
IngestResponse,
FetchData,
FetchDataResponse,
BrokersSymbolsResponse,
IngestAllRequest,
IngestAllResponse,
# QuoteDataRecord,
# LastDataPoint,
)
from pathlib import Path
logger = logging.getLogger(__name__)
class QuoteService:
def __init__(self, db: QuoteDatabase):
self.db = db
def ingest_archives_from_folder(self, folder_path: Union[Path, IngestAllRequest]) -> IngestAllResponse:
results = []
if isinstance(folder_path, IngestAllRequest):
folder_path = Path(folder_path.folder_path)
else:
folder_path = folder_path
if folder_path.exists():
if folder_path.is_dir():
for zip_file in folder_path.rglob("*.zip"):
ingest_request = IngestRequest(zip_path=str(zip_file))
response = self.ingest_archive(ingest_request)
results.append(response)
return IngestAllResponse(
status = "All done cheers.",
results = results
)
else:
return IngestAllResponse(
status = "Oops!, this is not a directory",
results = [
IngestResponse(
status= "There is an error",
message= "Select a folder please!"
)
]
)
else:
return IngestAllResponse(
status = "Oops!, this path does not exist",
results = [
IngestResponse(
status= "There is an error",
message= "Invalid path to directory selected"
)
]
)
def ingest_archive(self, request: IngestRequest) -> IngestResponse:
"""Ingest a ZIP archive into the database."""
zip_path_str = request.zip_path
zip_path = Path(zip_path_str)
if not zip_path.exists():
logger.warning(f"ZIP archive not found: {zip_path}")
return IngestResponse(status="failed", message="ZIP archive not found.")
try:
ingest_zip_archive(zip_path, db=self.db)
return IngestResponse(
status="success",
message="Archive ingested successfully."
)
except Exception as e:
logger.error(f"Ingestion error: {e}", exc_info=True)
return IngestResponse(
status="failed",
message=f"Ingestion error: {e}"
)
def get_all_brokers(self) -> FetchBrokersResponse:
"""Return all brokers in the database."""
brokers = self.db.get_all_brokers()
return FetchBrokersResponse(brokers=brokers)
def get_brokers_and_symbols(self) -> BrokersSymbolsResponse:
"""Return all brokers and their symbols in mapping form."""
mapping = self.db.get_brokers_and_symbols()
return BrokersSymbolsResponse(brokers=mapping)
def get_symbols(self, request: ListSymbolsRequest) -> ListSymbolsResponse:
"""Return all symbols for a broker."""
symbols = self.db.get_symbols_by_broker(request.broker)
return ListSymbolsResponse(symbols=symbols)
def get_dates(self, request: ListDatesRequest) -> ListDatesResponse:
"""Return all dates for a broker and symbol."""
dates = self.db.get_dates_by_broker_symbol(request.broker, request.symbol)
return ListDatesResponse(dates=dates)
def get_sessions(self, request: ListSessionRequest) -> ListSessionResponse:
"""Return all sessions for a broker, symbol, and date."""
sessions = self.db.get_sessions_by_date(
request.broker, request.symbol, request.date
)
return ListSessionResponse(sessions=sessions)
def get_quotes(self, request: FetchQuoteRequest) -> FetchQuoteResponse:
"""Return all quotes for a broker, symbol, and time range."""
quotes = self.db.fetch_quotes(
broker=request.broker,
symbol=request.symbol,
start_time=request.start_time,
end_time=request.end_time
)
return FetchQuoteResponse(
quotes=[
QuoteResponse(
session_id=row[0],
timestamp=row[1],
bid=row[2],
ask=row[3],
direction=row[4]
)
for row in quotes
]
)
def get_data(self, broker_a, symbol_a, broker_b, symbol_b, time_range_hours='all', startTime=None, endTime=None):
df_a, df_b = self.db.get_data(broker_a, symbol_a, broker_b, symbol_b, time_range_hours, startTime=startTime, endTime=endTime)
print("Data fetched: df_a shape:", {df_a.shape}, "df_b shape:", {df_b.shape})
if df_a.empty and df_b.empty:
print(f"No data after initial fetch: broker_a={broker_a}, symbol_a={symbol_a}, "
f"broker_b={broker_b}, symbol_b={symbol_b}")
return pd.DataFrame(), pd.DataFrame(), {}, []
def get_last_data_point(df, fields):
last_points = {}
for field in fields:
if not df.empty and field in df.columns:
latest_row = df[df[field].notna()].sort_values(by='timestamp', ascending=False).iloc[0]
last_points[field] = {
"timestamp": latest_row['timestamp'],
"value": latest_row[field]
}
else:
last_points[field] = {
"timestamp": None,
"value": None
}
return last_points
fields_a = ['ask', 'bid']
fields_b = ['ask', 'bid']
last_data_points = {
**{f"lastAskA": get_last_data_point(df_a, fields_a)['ask']},
**{f"lastBidA": get_last_data_point(df_a, fields_a)['bid']},
**{f"lastAskB": get_last_data_point(df_b, fields_b)['ask']},
**{f"lastBidB": get_last_data_point(df_b, fields_b)['bid']},
}
markable_df = pd.concat([df_a,df_b], ignore_index=True)
markable_records = markable_df[markable_df['direction'].isin(['buy', 'sell'])].to_dict(orient='records') if not markable_df.empty else []
return df_a, df_b, last_data_points, markable_records