From bbf5931c52f909cc63ea1317a0c0f54fdaff2009 Mon Sep 17 00:00:00 2001 From: sammanme Date: Fri, 12 Sep 2025 18:33:32 +0100 Subject: [PATCH] Upated markline and markpoint --- Quote_Manager_server/api_main.py | 3 +- Quote_Manager_server/ingest.py | 6 +- Quote_Manager_server/quote_contracts.py | 2 + Quote_Manager_server/quote_db.py | 59 +++++++------ Quote_Manager_server/quote_service.py | 105 ++++++++++-------------- Quote_Manager_server/quotes.db-journal | Bin 4616 -> 0 bytes Quote_Manager_server/routes.py | 56 +++++++++++-- Quote_Manager_server/test.py | 27 ++++++ 8 files changed, 161 insertions(+), 97 deletions(-) delete mode 100644 Quote_Manager_server/quotes.db-journal create mode 100644 Quote_Manager_server/test.py diff --git a/Quote_Manager_server/api_main.py b/Quote_Manager_server/api_main.py index fc3986e..29fb8f2 100644 --- a/Quote_Manager_server/api_main.py +++ b/Quote_Manager_server/api_main.py @@ -7,6 +7,7 @@ app = FastAPI( title="Quote Manager", description="Quote Manager", version="1.0.0", + swagger_ui_parameters={'syntaxHighlight': False}, ) @@ -24,7 +25,7 @@ app.add_middleware( ) # router for quote management -app.include_router(quote_router, prefix="/api/quotes") +app.include_router(quote_router, prefix="/api") if __name__ == "__main__": import uvicorn diff --git a/Quote_Manager_server/ingest.py b/Quote_Manager_server/ingest.py index 8d5fc4e..6c9ecae 100644 --- a/Quote_Manager_server/ingest.py +++ b/Quote_Manager_server/ingest.py @@ -31,11 +31,15 @@ def ingest_zip_archive(zip_path: str, db: QuoteDatabase): for row in reader: try: + if all(value == '' for value in row.values()): + print(f"Skipping empty row: {row}") + continue timestamp = int(row["Ts"]) bid = float(row["Bid"]) ask = float(row["Ask"]) + direction = str(row["Type"]) - quotes.append((timestamp, bid, ask)) + quotes.append((timestamp, bid, ask, direction)) timestamps.append(timestamp) except KeyError as e: diff --git a/Quote_Manager_server/quote_contracts.py b/Quote_Manager_server/quote_contracts.py index b09fb8f..9a2f4cb 100644 --- a/Quote_Manager_server/quote_contracts.py +++ b/Quote_Manager_server/quote_contracts.py @@ -9,6 +9,7 @@ class FetchData(BaseModel): bid_price: Optional[float] = Field(default=None) ask_price: Optional[float] = Field(default=None) timestamp: str + direction: Optional[str] = Field(default=None) broker: str symbol: str spread: Optional[float] = Field(default=None) @@ -32,6 +33,7 @@ class QuoteResponse(BaseModel): timestamp: int bid: float ask: float + direction: Optional[str] = None session_id: str class BrokersSymbolsResponse(BaseModel): diff --git a/Quote_Manager_server/quote_db.py b/Quote_Manager_server/quote_db.py index d731ada..41fb72f 100644 --- a/Quote_Manager_server/quote_db.py +++ b/Quote_Manager_server/quote_db.py @@ -36,6 +36,7 @@ class QuoteDatabase: timestamp INTEGER NOT NULL, bid REAL NOT NULL, ask REAL NOT NULL, + direction TEXT, FOREIGN KEY(session_id) REFERENCES sessions(session_id), UNIQUE(session_id, timestamp) ON CONFLICT REPLACE ) @@ -69,18 +70,19 @@ class QuoteDatabase: self.conn.commit() logger.info(f"Inserted session: {session_data[0]}, time range: {session_data[4]} to {session_data[5]}") - def insert_quotes_bulk(self, session_id: str, quotes: List[Tuple[int, float, float]]): + def insert_quotes_bulk(self, session_id: str, quotes: List[Tuple[int, float, float, str]]): if not quotes: logger.warning(f"Skipped empty quote list for session {session_id}") return query = """ - INSERT INTO quotes (session_id, timestamp, bid, ask) - VALUES (?, ?, ?, ?) + INSERT INTO quotes (session_id, timestamp, bid, ask, direction) + VALUES (?, ?, ?, ?, ?) ON CONFLICT(session_id, timestamp) DO UPDATE SET bid = excluded.bid, - ask = excluded.ask + ask = excluded.ask, + direction = excluded.direction """ try: self.conn.executemany(query, [(session_id, *q) for q in quotes]) @@ -97,9 +99,9 @@ class QuoteDatabase: symbol: Optional[str] = None, start_time: Optional[int] = None, end_time: Optional[int] = None - ) -> List[Tuple[str, str, float, float]]: + ) -> List[Tuple[str, str, float, float, str]]: self.conn.execute(""" - SELECT q.session_id, q.timestamp, q.bid, q.ask + SELECT q.session_id, q.timestamp, q.bid, q.ask, q.direction FROM quotes q JOIN sessions s ON q.session_id = s.session_id WHERE 1=1 @@ -128,8 +130,8 @@ class QuoteDatabase: return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).isoformat().replace("+00:00", "Z") result = [] - for session_id, timestamp, bid, ask in results: - result.append((session_id, to_iso(timestamp), bid, ask)) + for session_id, timestamp, bid, ask, direction in results: + result.append((session_id, to_iso(timestamp), bid, ask, direction)) return result @@ -189,7 +191,7 @@ class QuoteDatabase: def get_quotes_by_session(self, session_id: str) -> List[Tuple[str, float, float]]: cursor = self.conn.execute(""" - SELECT datetime(timestamp, 'unixepoch') as ts, bid, ask + SELECT datetime(timestamp, 'unixepoch') as ts, bid, ask, direction FROM quotes WHERE session_id = ? ORDER BY timestamp @@ -208,53 +210,58 @@ class QuoteDatabase: result.append(row[0]) return result - def get_data(self, broker_a, symbol_a, broker_b, symbol_b, limit=1000, time_range_hours='all'): + def get_data(self, broker_a, symbol_a, broker_b, symbol_b, time_range_hours='all', startTime=None, endTime=None): try: available_brokers = pd.read_sql_query("SELECT DISTINCT broker FROM sessions", self.conn)['broker'].tolist() available_symbols = pd.read_sql_query("SELECT DISTINCT symbol FROM sessions", self.conn)['symbol'].tolist() if broker_a not in available_brokers or broker_b not in available_brokers: print(f"Broker not found: broker_a={broker_a}, broker_b={broker_b}, available={available_brokers}") - return pd.DataFrame() + return pd.DataFrame(), pd.DataFrame() if symbol_a not in available_symbols or symbol_b not in available_symbols: print(f"Symbol not found: symbol_a={symbol_a}, symbol_b={symbol_b}, available={available_symbols}") - return pd.DataFrame() + return pd.DataFrame(), pd.DataFrame() query = """ - SELECT q.session_id, q.bid, q.ask, q.timestamp, s.broker, s.symbol + SELECT q.session_id, q.bid, q.ask, q.timestamp, q.direction, s.broker, s.symbol FROM quotes q JOIN sessions s ON q.session_id = s.session_id WHERE ( (s.broker = ? AND s.symbol = ?) OR (s.broker = ? AND s.symbol = ?) ) + ORDER BY q.timestamp DESC """ params = [broker_a, symbol_a, broker_b, symbol_b] - if time_range_hours != 'all': - current_time_ms = int(datetime.now().timestamp() * 1000) - time_range_ms = int(time_range_hours) * 3600 * 1000 - query += " AND q.timestamp >= ?" - params.append(current_time_ms - time_range_ms) - - query += " ORDER BY q.timestamp DESC LIMIT ?" - params.append(limit) - df = pd.read_sql_query(query, self.conn, params=tuple(params)) if df.empty: print(f"No data returned for query: broker_a={broker_a}, symbol_a={symbol_a}, " f"broker_b={broker_b}, symbol_b={symbol_b}, time_range={time_range_hours}") - return pd.DataFrame() + return pd.DataFrame(), pd.DataFrame() - df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce') + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms').dt.strftime('%Y-%m-%dT%H:%M:%SZ') - return df + df_a = df[(df['broker'] == broker_a) & (df['symbol'] == symbol_a)].copy() + df_b = df[(df['broker'] == broker_b) & (df['symbol'] == symbol_b)].copy() + + df_a['spread'] = df_a['ask'] - df_a['bid'] + df_b['spread'] = df_b['ask'] - df_b['bid'] + df_a['midline'] = (df_a['ask'] + df_a['bid']) / 2 + df_b['midline'] = (df_b['ask'] + df_b['bid']) / 2 + + print(df_a.head().to_dict()) + print(df_b.head().to_dict()) + + + + return df_a, df_b except Exception as e: print(f"Error in get_data: {e}") - return pd.DataFrame() + return pd.DataFrame(), pd.DataFrame(), {}, [] diff --git a/Quote_Manager_server/quote_service.py b/Quote_Manager_server/quote_service.py index 583b412..69c0406 100644 --- a/Quote_Manager_server/quote_service.py +++ b/Quote_Manager_server/quote_service.py @@ -1,11 +1,14 @@ from decimal import Decimal +from urllib import response import pandas as pd import logging from typing import Union +from datetime import datetime from sqlalchemy import Float from ingest import ingest_zip_archive from quote_db import QuoteDatabase from quote_contracts import ( + # BackendResponse, FetchQuoteRequest, QuoteResponse, FetchQuoteResponse, @@ -22,7 +25,9 @@ from quote_contracts import ( FetchDataResponse, BrokersSymbolsResponse, IngestAllRequest, - IngestAllResponse + IngestAllResponse, + # QuoteDataRecord, + # LastDataPoint, ) from pathlib import Path @@ -144,80 +149,54 @@ class QuoteService: session_id=row[0], timestamp=row[1], bid=row[2], - ask=row[3] + ask=row[3], + direction=row[4] ) for row in quotes ] ) - def get_data(self, broker_a, symbol_a, broker_b, symbol_b, limit=1000, time_range_hours='all'): - df = self.db.get_data(broker_a, symbol_a, broker_b, symbol_b, limit, time_range_hours) - if df.empty: + def get_data(self, broker_a, symbol_a, broker_b, symbol_b, time_range_hours='all', startTime=None, endTime=None): + df_a, df_b = self.db.get_data(broker_a, symbol_a, broker_b, symbol_b, time_range_hours, startTime=startTime, endTime=endTime) + print("Data fetched: df_a shape:", {df_a.shape}, "df_b shape:", {df_b.shape}) + if df_a.empty and df_b.empty: print(f"No data after initial fetch: broker_a={broker_a}, symbol_a={symbol_a}, " f"broker_b={broker_b}, symbol_b={symbol_b}") - return [] + + return pd.DataFrame(), pd.DataFrame(), {}, [] + def get_last_data_point(df, fields): + last_points = {} + for field in fields: + if not df.empty and field in df.columns: + latest_row = df[df[field].notna()].sort_values(by='timestamp', ascending=False).iloc[0] + last_points[field] = { + "timestamp": latest_row['timestamp'], + "value": latest_row[field] + } + else: + last_points[field] = { + "timestamp": None, + "value": None + } + return last_points - df_a = df[(df['broker'] == broker_a) & (df['symbol'] == symbol_a)].copy() - df_b = df[(df['broker'] == broker_b) & (df['symbol'] == symbol_b)].copy() + fields_a = ['ask', 'bid'] + fields_b = ['ask', 'bid'] + last_data_points = { + **{f"lastAskA": get_last_data_point(df_a, fields_a)['ask']}, + **{f"lastBidA": get_last_data_point(df_a, fields_a)['bid']}, + **{f"lastAskB": get_last_data_point(df_b, fields_b)['ask']}, + **{f"lastBidB": get_last_data_point(df_b, fields_b)['bid']}, + } - result = [] - df_a['spread'] = df_a['ask'] - df_a['bid'] - df_b['spread'] = df_b['ask'] - df_b['bid'] - df_a['midline'] = (df_a['ask'] + df_a['bid']) / 2 - df_b['midline'] = (df_b['ask'] + df_b['bid']) / 2 - - - if not df_a.empty: - df_a = df_a.dropna(subset=['timestamp']) - if df_a.empty: - print(f"No valid data for {broker_a}/{symbol_a} after dropping invalid timestamps") - else: - result_a = [ - FetchData( - # session_id=row['session_id'], - bid_price=(row['bid']) if pd.notna(row['bid']) else None, - ask_price=(row['ask']) if pd.notna(row['ask']) else None, - timestamp=str(row['timestamp']), - broker=broker_a, - symbol=symbol_a, - spread=row['spread'] if pd.notna(row['spread']) else None, - midline=row['midline'] if pd.notna(row['midline']) else None - ) - for _, row in df_a.iterrows() - - ] - result.extend(result_a) - - if not df_b.empty: - df_b = df_b.dropna(subset=['timestamp']) - if df_b.empty: - print(f"No valid data for {broker_b}/{symbol_b} after dropping invalid timestamps") - else: - result_b = [ - FetchData( - # session_id=row['session_id'], - bid_price=(row['bid']) if pd.notna(row['bid']) else None, - ask_price=(row['ask']) if pd.notna(row['ask']) else None, - timestamp=str(row['timestamp']), - broker=broker_b, - symbol=symbol_b, - spread=row['spread'] if pd.notna(row['spread']) else None, - midline=row['midline'] if pd.notna(row['midline']) else None - - ) - for _, row in df_b.iterrows() - - ] - result.extend(result_b) + markable_df = pd.concat([df_a,df_b], ignore_index=True) + markable_records = markable_df[markable_df['direction'].isin(['buy', 'sell'])].to_dict(orient='records') if not markable_df.empty else [] - if not result: - print(f"No valid data after processing: broker_a={broker_a}, symbol_a={symbol_a}, " - f"broker_b={broker_b}, symbol_b={symbol_b}") - return [] + return df_a, df_b, last_data_points, markable_records + - - return result[:limit] + diff --git a/Quote_Manager_server/quotes.db-journal b/Quote_Manager_server/quotes.db-journal deleted file mode 100644 index 6bb7bfa7daa045170ea8d889b3af050c823414bd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4616 zcmZQzKmeP+`5_Fagv2OoIEMfmFVJ_4{2v*BXcUZwz-S1JhQMeDjE2By v2#kinXb6mkz-S1JhQMeDjE2By2n?4H;ALc%WGO7oFG($C_+a!AH0}ccnpq3k diff --git a/Quote_Manager_server/routes.py b/Quote_Manager_server/routes.py index 1488290..f2ea3c0 100644 --- a/Quote_Manager_server/routes.py +++ b/Quote_Manager_server/routes.py @@ -1,6 +1,9 @@ from fastapi import APIRouter, Depends, HTTPException, Query from pathlib import Path +import pandas as pd import logging +from typing import List, Dict, Any +import json from quote_db import QuoteDatabase from quote_service import QuoteService from quote_contracts import ( @@ -10,7 +13,8 @@ from quote_contracts import ( ListSessionRequest, ListSessionResponse, IngestRequest, IngestResponse, FetchBrokersResponse, FetchDataResponse, - BrokersSymbolsResponse, IngestAllRequest, IngestAllResponse + BrokersSymbolsResponse, IngestAllRequest, IngestAllResponse, + # QuoteDataRecord ) @@ -36,18 +40,58 @@ def ingest_archives(request: IngestAllRequest, service: QuoteService = Depends(g def ingest_quotes(request: IngestRequest, service: QuoteService = Depends(get_service)): return service.ingest_archive(request) -@router.get("/api/data", response_model=FetchDataResponse) +@router.get("/data") async def get_data( broker_a: str = Query(...), symbol_a: str = Query(...), broker_b: str = Query(...), symbol_b: str = Query(...), - time_range_hours: str = Query('all', description="Time range: 'all' or hours (1, 6, 24)"), - limit: int = Query(1000, description="Maximum number of records"), + + service: QuoteService = Depends(get_service) ): - data = service.get_data(broker_a, symbol_a, broker_b, symbol_b, limit, time_range_hours) - return {"data": data} + df_a, df_b, last_data_points, markable_records = service.get_data(broker_a, symbol_a, broker_b, symbol_b) + combined_data = pd.concat([df_a, df_b], ignore_index=True) if not df_a.empty or not df_b.empty else pd.DataFrame() + + dataset_source = combined_data.to_dict(orient='records') if not combined_data.empty else [] + + records: List[Dict[str, Any]] = [] + timestamps = set() + for record in dataset_source: + timestamp = record["timestamp"] + if timestamp not in timestamps: + records.append({ + "askA": record["ask"] if record["broker"] == broker_a else None, + "bidA": record["bid"] if record["broker"] == broker_a else None, + "midlineA": record["midline"] if record["broker"] == broker_a else None, + "spreadA": record["spread"] if record["broker"] == broker_a else None, + "directionA": record["direction"] if record["broker"] == broker_a else None, + "askB": record["ask"] if record["broker"] == broker_b else None, + "bidB": record["bid"] if record["broker"] == broker_b else None, + "midlineB": record["midline"] if record["broker"] == broker_b else None, + "spreadB": record["spread"] if record["broker"] == broker_b else None, + "directionB": record["direction"] if record["broker"] == broker_b else None, + "timestamp": timestamp + }) + timestamps.add(timestamp) + + + response_data = { + "records": records, + "brokerA": broker_a, + "symbolA": symbol_a, + "brokerB": broker_b, + "symbolB": symbol_b, + "lastDataPoints": last_data_points, + "markableRecords": markable_records + } + + print(f"Response data size: {len(response_data['records'])} records, lastDataPoints: {len(last_data_points)}, markableRecords: {len(markable_records)}") + print(f"Response data sample: {json.dumps(response_data, default=str)[:500]}...") + + return response_data + + @router.get("/brokers", response_model=FetchBrokersResponse) def get_all_brokers(service: QuoteService = Depends(get_service)): diff --git a/Quote_Manager_server/test.py b/Quote_Manager_server/test.py new file mode 100644 index 0000000..ea1f897 --- /dev/null +++ b/Quote_Manager_server/test.py @@ -0,0 +1,27 @@ +import sqlite3 +import json + +conn = sqlite3.connect('quotes.db') + +cursor = conn.cursor() + +query = """ +SELECT + quotes.timestamp, quotes.bid, quotes.ask, quotes.direction +FROM quotes +""" + +cursor.execute(query) +tables = cursor.fetchall() + +column_names = [desc [0] for desc in cursor.description] + +data_list = [] +for row in tables: + data_dict = {column_names[i]: row[i] for i in range(len(column_names))} + data_list.append(data_dict) + +json_output = json.dumps(data_list, indent=4) +print(json_output) + +cursor.close() \ No newline at end of file