import sqlite3 from typing import List, Tuple, Optional from datetime import datetime, timezone import logging import pandas as pd logger = logging.getLogger(__name__) class QuoteDatabase: db_path = "quotes.db" def __init__(self, db_path=db_path): self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.execute("PRAGMA foreign_keys = ON") self._create_tables() def _create_tables(self): if self.conn: try: self.conn.execute(""" CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, broker TEXT NOT NULL, symbol TEXT NOT NULL, archive_name TEXT NOT NULL, start_time INTEGER NOT NULL, end_time INTEGER NOT NULL ); """) self.conn.execute(""" CREATE TABLE IF NOT EXISTS quotes ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, timestamp INTEGER NOT NULL, bid REAL NOT NULL, ask REAL NOT NULL, direction TEXT, FOREIGN KEY(session_id) REFERENCES sessions(session_id), UNIQUE(session_id, timestamp) ON CONFLICT REPLACE ) """) self.conn.commit() logger.info(f"Table has been created succesfully") except sqlite3.Error as e: logger.error(f"Error while creating the table") else: logger.info(f"Database connection did not activate, check connection") def session_exists(self, session_id: str) -> bool: query = "SELECT 1 FROM sessions WHERE session_id = ? LIMIT 1" cursor = self.conn.execute(query, (session_id,)) result = cursor.fetchone() if result is not None: logging.error(f"This session has existed somewhere on the database before, kindly confirm.") return result else: logging.info(f"Session is not on database before, processing.") def insert_session(self, session_data: Tuple[str, str, str, str, int, int]): logger.info(f"Inserting session: {session_data[0]}") """Insert a session: (session_id, broker, symbol, archive_name, start_time, end_time)""" self.conn.execute(""" INSERT INTO sessions (session_id, broker, symbol, archive_name, start_time, end_time) VALUES (?, ?, ?, ?, ?, ?) """, session_data) self.conn.commit() logger.info(f"Inserted session: {session_data[0]}, time range: {session_data[4]} to {session_data[5]}") def insert_quotes_bulk(self, session_id: str, quotes: List[Tuple[int, float, float, str]]): if not quotes: logger.warning(f"Skipped empty quote list for session {session_id}") return query = """ INSERT INTO quotes (session_id, timestamp, bid, ask, direction) VALUES (?, ?, ?, ?, ?) ON CONFLICT(session_id, timestamp) DO UPDATE SET bid = excluded.bid, ask = excluded.ask, direction = excluded.direction """ try: self.conn.executemany(query, [(session_id, *q) for q in quotes]) self.conn.commit() logger.info(f"Quotes inserted successfully for session {session_id} ({len(quotes)} quotes)") except Exception as e: logger.error(f"Error inserting quotes for session {session_id}: {e}", exc_info=True) def fetch_quotes( self, broker: Optional[str] = None, symbol: Optional[str] = None, start_time: Optional[int] = None, end_time: Optional[int] = None ) -> List[Tuple[str, str, float, float, str]]: self.conn.execute(""" SELECT q.session_id, q.timestamp, q.bid, q.ask, q.direction FROM quotes q JOIN sessions s ON q.session_id = s.session_id WHERE 1=1 """) params = [] if broker is not None: query += " AND s.broker = ?" params.append(broker) if symbol is not None: query += " AND s.symbol = ?" params.append(symbol) if start_time is not None: query += " AND q.timestamp >= ?" params.append(start_time) if end_time is not None: query += " AND q.timestamp <= ?" params.append(end_time) cursor = self.conn.execute(query, params) results = cursor.fetchall() def to_iso(ts: int) -> str: if ts > 1e12: ts = ts / 1000 return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).isoformat().replace("+00:00", "Z") result = [] for session_id, timestamp, bid, ask, direction in results: result.append((session_id, to_iso(timestamp), bid, ask, direction)) return result def get_all_brokers(self) -> List[str]: cursor = self.conn.execute("SELECT DISTINCT broker FROM sessions") result = [] for row in cursor.fetchall(): result.append(row[0]) return result def get_symbols_by_broker(self, broker: str) -> List[str]: cursor = self.conn.execute(""" SELECT DISTINCT symbol FROM sessions WHERE broker = ? """, (broker,)) result = [] for row in cursor.fetchall(): result.append(row[0]) return result def get_brokers_and_symbols(self): cursor = self.conn.execute("SELECT DISTINCT broker FROM sessions") brokers_list = [row[0] for row in cursor.fetchall()] result = {} for broker in brokers_list: result[broker] = self.get_symbols_by_broker(broker) return result def get_dates_by_broker_symbol(self, broker: str, symbol: str) -> List[str]: cursor = self.conn.execute(""" SELECT DISTINCT DATE(q.timestamp / 1000, 'unixepoch') FROM quotes q JOIN sessions s ON q.session_id = s.session_id WHERE s.broker = ? AND s.symbol = ? ORDER BY 1 """, (broker, symbol)) result = [] for row in cursor.fetchall(): result.append(row[0]) return result def get_sessions_by_date(self, broker: str, symbol: str, date: str) -> List[str]: cursor = self.conn.execute(""" SELECT DISTINCT s.session_id FROM sessions s JOIN quotes q ON s.session_id = q.session_id WHERE s.broker = ? AND s.symbol = ? AND DATE(q.timestamp / 1000, 'unixepoch') = ? """, (broker, symbol, date)) result = [] for row in cursor.fetchall(): result.append(row[0]) return result def get_quotes_by_session(self, session_id: str) -> List[Tuple[str, float, float]]: cursor = self.conn.execute(""" SELECT datetime(timestamp, 'unixepoch') as ts, bid, ask, direction FROM quotes WHERE session_id = ? ORDER BY timestamp """, (session_id,)) return cursor.fetchall() def get_sessions_by_date_range(self, broker: str, symbol: str, date: str) -> List[str]: cursor = self.conn.execute(""" SELECT session_id FROM sessions WHERE broker = ? AND symbol = ? AND DATE(start_time, 'unixepoch') <= ? AND DATE(end_time, 'unixepoch') >= ? ORDER BY start_time """, (broker, symbol, date, date)) result = [] for row in cursor.fetchall(): result.append(row[0]) return result def get_data(self, broker_a, symbol_a, broker_b, symbol_b, time_range_hours='all', startTime=None, endTime=None): try: available_brokers = pd.read_sql_query("SELECT DISTINCT broker FROM sessions", self.conn)['broker'].tolist() available_symbols = pd.read_sql_query("SELECT DISTINCT symbol FROM sessions", self.conn)['symbol'].tolist() if broker_a not in available_brokers or broker_b not in available_brokers: print(f"Broker not found: broker_a={broker_a}, broker_b={broker_b}, available={available_brokers}") return pd.DataFrame(), pd.DataFrame() if symbol_a not in available_symbols or symbol_b not in available_symbols: print(f"Symbol not found: symbol_a={symbol_a}, symbol_b={symbol_b}, available={available_symbols}") return pd.DataFrame(), pd.DataFrame() query = """ SELECT q.session_id, q.bid, q.ask, q.timestamp, q.direction, s.broker, s.symbol FROM quotes q JOIN sessions s ON q.session_id = s.session_id WHERE ( (s.broker = ? AND s.symbol = ?) OR (s.broker = ? AND s.symbol = ?) ) ORDER BY q.timestamp DESC """ params = [broker_a, symbol_a, broker_b, symbol_b] df = pd.read_sql_query(query, self.conn, params=tuple(params)) if df.empty: print(f"No data returned for query: broker_a={broker_a}, symbol_a={symbol_a}, " f"broker_b={broker_b}, symbol_b={symbol_b}, time_range={time_range_hours}") return pd.DataFrame(), pd.DataFrame() df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms').dt.strftime('%Y-%m-%dT%H:%M:%SZ') df_a = df[(df['broker'] == broker_a) & (df['symbol'] == symbol_a)].copy() df_b = df[(df['broker'] == broker_b) & (df['symbol'] == symbol_b)].copy() df_a['spread'] = df_a['ask'] - df_a['bid'] df_b['spread'] = df_b['ask'] - df_b['bid'] df_a['midline'] = (df_a['ask'] + df_a['bid']) / 2 df_b['midline'] = (df_b['ask'] + df_b['bid']) / 2 print(df_a.head().to_dict()) print(df_b.head().to_dict()) return df_a, df_b except Exception as e: print(f"Error in get_data: {e}") return pd.DataFrame(), pd.DataFrame(), {}, [] def close(self): if self.conn: self.conn.close() def __del__(self): self.close()