Upated markline and markpoint

This commit is contained in:
sammanme 2025-09-12 18:33:32 +01:00
parent 43ac3181d5
commit bbf5931c52
8 changed files with 161 additions and 97 deletions

View File

@ -7,6 +7,7 @@ app = FastAPI(
title="Quote Manager", title="Quote Manager",
description="Quote Manager", description="Quote Manager",
version="1.0.0", version="1.0.0",
swagger_ui_parameters={'syntaxHighlight': False},
) )
@ -24,7 +25,7 @@ app.add_middleware(
) )
# router for quote management # router for quote management
app.include_router(quote_router, prefix="/api/quotes") app.include_router(quote_router, prefix="/api")
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn

View File

@ -31,11 +31,15 @@ def ingest_zip_archive(zip_path: str, db: QuoteDatabase):
for row in reader: for row in reader:
try: try:
if all(value == '' for value in row.values()):
print(f"Skipping empty row: {row}")
continue
timestamp = int(row["Ts"]) timestamp = int(row["Ts"])
bid = float(row["Bid"]) bid = float(row["Bid"])
ask = float(row["Ask"]) ask = float(row["Ask"])
direction = str(row["Type"])
quotes.append((timestamp, bid, ask)) quotes.append((timestamp, bid, ask, direction))
timestamps.append(timestamp) timestamps.append(timestamp)
except KeyError as e: except KeyError as e:

View File

@ -9,6 +9,7 @@ class FetchData(BaseModel):
bid_price: Optional[float] = Field(default=None) bid_price: Optional[float] = Field(default=None)
ask_price: Optional[float] = Field(default=None) ask_price: Optional[float] = Field(default=None)
timestamp: str timestamp: str
direction: Optional[str] = Field(default=None)
broker: str broker: str
symbol: str symbol: str
spread: Optional[float] = Field(default=None) spread: Optional[float] = Field(default=None)
@ -32,6 +33,7 @@ class QuoteResponse(BaseModel):
timestamp: int timestamp: int
bid: float bid: float
ask: float ask: float
direction: Optional[str] = None
session_id: str session_id: str
class BrokersSymbolsResponse(BaseModel): class BrokersSymbolsResponse(BaseModel):

View File

@ -36,6 +36,7 @@ class QuoteDatabase:
timestamp INTEGER NOT NULL, timestamp INTEGER NOT NULL,
bid REAL NOT NULL, bid REAL NOT NULL,
ask REAL NOT NULL, ask REAL NOT NULL,
direction TEXT,
FOREIGN KEY(session_id) REFERENCES sessions(session_id), FOREIGN KEY(session_id) REFERENCES sessions(session_id),
UNIQUE(session_id, timestamp) ON CONFLICT REPLACE UNIQUE(session_id, timestamp) ON CONFLICT REPLACE
) )
@ -69,18 +70,19 @@ class QuoteDatabase:
self.conn.commit() self.conn.commit()
logger.info(f"Inserted session: {session_data[0]}, time range: {session_data[4]} to {session_data[5]}") logger.info(f"Inserted session: {session_data[0]}, time range: {session_data[4]} to {session_data[5]}")
def insert_quotes_bulk(self, session_id: str, quotes: List[Tuple[int, float, float]]): def insert_quotes_bulk(self, session_id: str, quotes: List[Tuple[int, float, float, str]]):
if not quotes: if not quotes:
logger.warning(f"Skipped empty quote list for session {session_id}") logger.warning(f"Skipped empty quote list for session {session_id}")
return return
query = """ query = """
INSERT INTO quotes (session_id, timestamp, bid, ask) INSERT INTO quotes (session_id, timestamp, bid, ask, direction)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, timestamp) ON CONFLICT(session_id, timestamp)
DO UPDATE SET DO UPDATE SET
bid = excluded.bid, bid = excluded.bid,
ask = excluded.ask ask = excluded.ask,
direction = excluded.direction
""" """
try: try:
self.conn.executemany(query, [(session_id, *q) for q in quotes]) self.conn.executemany(query, [(session_id, *q) for q in quotes])
@ -97,9 +99,9 @@ class QuoteDatabase:
symbol: Optional[str] = None, symbol: Optional[str] = None,
start_time: Optional[int] = None, start_time: Optional[int] = None,
end_time: Optional[int] = None end_time: Optional[int] = None
) -> List[Tuple[str, str, float, float]]: ) -> List[Tuple[str, str, float, float, str]]:
self.conn.execute(""" self.conn.execute("""
SELECT q.session_id, q.timestamp, q.bid, q.ask SELECT q.session_id, q.timestamp, q.bid, q.ask, q.direction
FROM quotes q FROM quotes q
JOIN sessions s ON q.session_id = s.session_id JOIN sessions s ON q.session_id = s.session_id
WHERE 1=1 WHERE 1=1
@ -128,8 +130,8 @@ class QuoteDatabase:
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).isoformat().replace("+00:00", "Z") return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).isoformat().replace("+00:00", "Z")
result = [] result = []
for session_id, timestamp, bid, ask in results: for session_id, timestamp, bid, ask, direction in results:
result.append((session_id, to_iso(timestamp), bid, ask)) result.append((session_id, to_iso(timestamp), bid, ask, direction))
return result return result
@ -189,7 +191,7 @@ class QuoteDatabase:
def get_quotes_by_session(self, session_id: str) -> List[Tuple[str, float, float]]: def get_quotes_by_session(self, session_id: str) -> List[Tuple[str, float, float]]:
cursor = self.conn.execute(""" cursor = self.conn.execute("""
SELECT datetime(timestamp, 'unixepoch') as ts, bid, ask SELECT datetime(timestamp, 'unixepoch') as ts, bid, ask, direction
FROM quotes FROM quotes
WHERE session_id = ? WHERE session_id = ?
ORDER BY timestamp ORDER BY timestamp
@ -208,53 +210,58 @@ class QuoteDatabase:
result.append(row[0]) result.append(row[0])
return result return result
def get_data(self, broker_a, symbol_a, broker_b, symbol_b, limit=1000, time_range_hours='all'): def get_data(self, broker_a, symbol_a, broker_b, symbol_b, time_range_hours='all', startTime=None, endTime=None):
try: try:
available_brokers = pd.read_sql_query("SELECT DISTINCT broker FROM sessions", self.conn)['broker'].tolist() available_brokers = pd.read_sql_query("SELECT DISTINCT broker FROM sessions", self.conn)['broker'].tolist()
available_symbols = pd.read_sql_query("SELECT DISTINCT symbol FROM sessions", self.conn)['symbol'].tolist() available_symbols = pd.read_sql_query("SELECT DISTINCT symbol FROM sessions", self.conn)['symbol'].tolist()
if broker_a not in available_brokers or broker_b not in available_brokers: if broker_a not in available_brokers or broker_b not in available_brokers:
print(f"Broker not found: broker_a={broker_a}, broker_b={broker_b}, available={available_brokers}") print(f"Broker not found: broker_a={broker_a}, broker_b={broker_b}, available={available_brokers}")
return pd.DataFrame() return pd.DataFrame(), pd.DataFrame()
if symbol_a not in available_symbols or symbol_b not in available_symbols: if symbol_a not in available_symbols or symbol_b not in available_symbols:
print(f"Symbol not found: symbol_a={symbol_a}, symbol_b={symbol_b}, available={available_symbols}") print(f"Symbol not found: symbol_a={symbol_a}, symbol_b={symbol_b}, available={available_symbols}")
return pd.DataFrame() return pd.DataFrame(), pd.DataFrame()
query = """ query = """
SELECT q.session_id, q.bid, q.ask, q.timestamp, s.broker, s.symbol SELECT q.session_id, q.bid, q.ask, q.timestamp, q.direction, s.broker, s.symbol
FROM quotes q FROM quotes q
JOIN sessions s ON q.session_id = s.session_id JOIN sessions s ON q.session_id = s.session_id
WHERE ( WHERE (
(s.broker = ? AND s.symbol = ?) OR (s.broker = ? AND s.symbol = ?) OR
(s.broker = ? AND s.symbol = ?) (s.broker = ? AND s.symbol = ?)
) )
ORDER BY q.timestamp DESC
""" """
params = [broker_a, symbol_a, broker_b, symbol_b] params = [broker_a, symbol_a, broker_b, symbol_b]
if time_range_hours != 'all':
current_time_ms = int(datetime.now().timestamp() * 1000)
time_range_ms = int(time_range_hours) * 3600 * 1000
query += " AND q.timestamp >= ?"
params.append(current_time_ms - time_range_ms)
query += " ORDER BY q.timestamp DESC LIMIT ?"
params.append(limit)
df = pd.read_sql_query(query, self.conn, params=tuple(params)) df = pd.read_sql_query(query, self.conn, params=tuple(params))
if df.empty: if df.empty:
print(f"No data returned for query: broker_a={broker_a}, symbol_a={symbol_a}, " print(f"No data returned for query: broker_a={broker_a}, symbol_a={symbol_a}, "
f"broker_b={broker_b}, symbol_b={symbol_b}, time_range={time_range_hours}") f"broker_b={broker_b}, symbol_b={symbol_b}, time_range={time_range_hours}")
return pd.DataFrame() return pd.DataFrame(), pd.DataFrame()
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce') df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms').dt.strftime('%Y-%m-%dT%H:%M:%SZ')
return df df_a = df[(df['broker'] == broker_a) & (df['symbol'] == symbol_a)].copy()
df_b = df[(df['broker'] == broker_b) & (df['symbol'] == symbol_b)].copy()
df_a['spread'] = df_a['ask'] - df_a['bid']
df_b['spread'] = df_b['ask'] - df_b['bid']
df_a['midline'] = (df_a['ask'] + df_a['bid']) / 2
df_b['midline'] = (df_b['ask'] + df_b['bid']) / 2
print(df_a.head().to_dict())
print(df_b.head().to_dict())
return df_a, df_b
except Exception as e: except Exception as e:
print(f"Error in get_data: {e}") print(f"Error in get_data: {e}")
return pd.DataFrame() return pd.DataFrame(), pd.DataFrame(), {}, []

View File

@ -1,11 +1,14 @@
from decimal import Decimal from decimal import Decimal
from urllib import response
import pandas as pd import pandas as pd
import logging import logging
from typing import Union from typing import Union
from datetime import datetime
from sqlalchemy import Float from sqlalchemy import Float
from ingest import ingest_zip_archive from ingest import ingest_zip_archive
from quote_db import QuoteDatabase from quote_db import QuoteDatabase
from quote_contracts import ( from quote_contracts import (
# BackendResponse,
FetchQuoteRequest, FetchQuoteRequest,
QuoteResponse, QuoteResponse,
FetchQuoteResponse, FetchQuoteResponse,
@ -22,7 +25,9 @@ from quote_contracts import (
FetchDataResponse, FetchDataResponse,
BrokersSymbolsResponse, BrokersSymbolsResponse,
IngestAllRequest, IngestAllRequest,
IngestAllResponse IngestAllResponse,
# QuoteDataRecord,
# LastDataPoint,
) )
from pathlib import Path from pathlib import Path
@ -144,80 +149,54 @@ class QuoteService:
session_id=row[0], session_id=row[0],
timestamp=row[1], timestamp=row[1],
bid=row[2], bid=row[2],
ask=row[3] ask=row[3],
direction=row[4]
) )
for row in quotes for row in quotes
] ]
) )
def get_data(self, broker_a, symbol_a, broker_b, symbol_b, limit=1000, time_range_hours='all'): def get_data(self, broker_a, symbol_a, broker_b, symbol_b, time_range_hours='all', startTime=None, endTime=None):
df = self.db.get_data(broker_a, symbol_a, broker_b, symbol_b, limit, time_range_hours) df_a, df_b = self.db.get_data(broker_a, symbol_a, broker_b, symbol_b, time_range_hours, startTime=startTime, endTime=endTime)
if df.empty: print("Data fetched: df_a shape:", {df_a.shape}, "df_b shape:", {df_b.shape})
if df_a.empty and df_b.empty:
print(f"No data after initial fetch: broker_a={broker_a}, symbol_a={symbol_a}, " print(f"No data after initial fetch: broker_a={broker_a}, symbol_a={symbol_a}, "
f"broker_b={broker_b}, symbol_b={symbol_b}") f"broker_b={broker_b}, symbol_b={symbol_b}")
return []
return pd.DataFrame(), pd.DataFrame(), {}, []
def get_last_data_point(df, fields):
last_points = {}
for field in fields:
if not df.empty and field in df.columns:
latest_row = df[df[field].notna()].sort_values(by='timestamp', ascending=False).iloc[0]
last_points[field] = {
"timestamp": latest_row['timestamp'],
"value": latest_row[field]
}
else:
last_points[field] = {
"timestamp": None,
"value": None
}
return last_points
df_a = df[(df['broker'] == broker_a) & (df['symbol'] == symbol_a)].copy() fields_a = ['ask', 'bid']
df_b = df[(df['broker'] == broker_b) & (df['symbol'] == symbol_b)].copy() fields_b = ['ask', 'bid']
last_data_points = {
**{f"lastAskA": get_last_data_point(df_a, fields_a)['ask']},
**{f"lastBidA": get_last_data_point(df_a, fields_a)['bid']},
**{f"lastAskB": get_last_data_point(df_b, fields_b)['ask']},
**{f"lastBidB": get_last_data_point(df_b, fields_b)['bid']},
}
result = []
df_a['spread'] = df_a['ask'] - df_a['bid']
df_b['spread'] = df_b['ask'] - df_b['bid']
df_a['midline'] = (df_a['ask'] + df_a['bid']) / 2
df_b['midline'] = (df_b['ask'] + df_b['bid']) / 2
if not df_a.empty:
df_a = df_a.dropna(subset=['timestamp'])
if df_a.empty:
print(f"No valid data for {broker_a}/{symbol_a} after dropping invalid timestamps")
else:
result_a = [
FetchData(
# session_id=row['session_id'],
bid_price=(row['bid']) if pd.notna(row['bid']) else None,
ask_price=(row['ask']) if pd.notna(row['ask']) else None,
timestamp=str(row['timestamp']),
broker=broker_a,
symbol=symbol_a,
spread=row['spread'] if pd.notna(row['spread']) else None,
midline=row['midline'] if pd.notna(row['midline']) else None
)
for _, row in df_a.iterrows()
]
result.extend(result_a)
markable_df = pd.concat([df_a,df_b], ignore_index=True)
if not df_b.empty: markable_records = markable_df[markable_df['direction'].isin(['buy', 'sell'])].to_dict(orient='records') if not markable_df.empty else []
df_b = df_b.dropna(subset=['timestamp'])
if df_b.empty:
print(f"No valid data for {broker_b}/{symbol_b} after dropping invalid timestamps")
else:
result_b = [
FetchData(
# session_id=row['session_id'],
bid_price=(row['bid']) if pd.notna(row['bid']) else None,
ask_price=(row['ask']) if pd.notna(row['ask']) else None,
timestamp=str(row['timestamp']),
broker=broker_b,
symbol=symbol_b,
spread=row['spread'] if pd.notna(row['spread']) else None,
midline=row['midline'] if pd.notna(row['midline']) else None
)
for _, row in df_b.iterrows()
]
result.extend(result_b)
if not result: return df_a, df_b, last_data_points, markable_records
print(f"No valid data after processing: broker_a={broker_a}, symbol_a={symbol_a}, "
f"broker_b={broker_b}, symbol_b={symbol_b}")
return []
return result[:limit]

View File

@ -1,6 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from pathlib import Path from pathlib import Path
import pandas as pd
import logging import logging
from typing import List, Dict, Any
import json
from quote_db import QuoteDatabase from quote_db import QuoteDatabase
from quote_service import QuoteService from quote_service import QuoteService
from quote_contracts import ( from quote_contracts import (
@ -10,7 +13,8 @@ from quote_contracts import (
ListSessionRequest, ListSessionResponse, ListSessionRequest, ListSessionResponse,
IngestRequest, IngestResponse, IngestRequest, IngestResponse,
FetchBrokersResponse, FetchDataResponse, FetchBrokersResponse, FetchDataResponse,
BrokersSymbolsResponse, IngestAllRequest, IngestAllResponse BrokersSymbolsResponse, IngestAllRequest, IngestAllResponse,
# QuoteDataRecord
) )
@ -36,18 +40,58 @@ def ingest_archives(request: IngestAllRequest, service: QuoteService = Depends(g
def ingest_quotes(request: IngestRequest, service: QuoteService = Depends(get_service)): def ingest_quotes(request: IngestRequest, service: QuoteService = Depends(get_service)):
return service.ingest_archive(request) return service.ingest_archive(request)
@router.get("/api/data", response_model=FetchDataResponse) @router.get("/data")
async def get_data( async def get_data(
broker_a: str = Query(...), broker_a: str = Query(...),
symbol_a: str = Query(...), symbol_a: str = Query(...),
broker_b: str = Query(...), broker_b: str = Query(...),
symbol_b: str = Query(...), symbol_b: str = Query(...),
time_range_hours: str = Query('all', description="Time range: 'all' or hours (1, 6, 24)"),
limit: int = Query(1000, description="Maximum number of records"),
service: QuoteService = Depends(get_service) service: QuoteService = Depends(get_service)
): ):
data = service.get_data(broker_a, symbol_a, broker_b, symbol_b, limit, time_range_hours) df_a, df_b, last_data_points, markable_records = service.get_data(broker_a, symbol_a, broker_b, symbol_b)
return {"data": data} combined_data = pd.concat([df_a, df_b], ignore_index=True) if not df_a.empty or not df_b.empty else pd.DataFrame()
dataset_source = combined_data.to_dict(orient='records') if not combined_data.empty else []
records: List[Dict[str, Any]] = []
timestamps = set()
for record in dataset_source:
timestamp = record["timestamp"]
if timestamp not in timestamps:
records.append({
"askA": record["ask"] if record["broker"] == broker_a else None,
"bidA": record["bid"] if record["broker"] == broker_a else None,
"midlineA": record["midline"] if record["broker"] == broker_a else None,
"spreadA": record["spread"] if record["broker"] == broker_a else None,
"directionA": record["direction"] if record["broker"] == broker_a else None,
"askB": record["ask"] if record["broker"] == broker_b else None,
"bidB": record["bid"] if record["broker"] == broker_b else None,
"midlineB": record["midline"] if record["broker"] == broker_b else None,
"spreadB": record["spread"] if record["broker"] == broker_b else None,
"directionB": record["direction"] if record["broker"] == broker_b else None,
"timestamp": timestamp
})
timestamps.add(timestamp)
response_data = {
"records": records,
"brokerA": broker_a,
"symbolA": symbol_a,
"brokerB": broker_b,
"symbolB": symbol_b,
"lastDataPoints": last_data_points,
"markableRecords": markable_records
}
print(f"Response data size: {len(response_data['records'])} records, lastDataPoints: {len(last_data_points)}, markableRecords: {len(markable_records)}")
print(f"Response data sample: {json.dumps(response_data, default=str)[:500]}...")
return response_data
@router.get("/brokers", response_model=FetchBrokersResponse) @router.get("/brokers", response_model=FetchBrokersResponse)
def get_all_brokers(service: QuoteService = Depends(get_service)): def get_all_brokers(service: QuoteService = Depends(get_service)):

View File

@ -0,0 +1,27 @@
import sqlite3
import json
conn = sqlite3.connect('quotes.db')
cursor = conn.cursor()
query = """
SELECT
quotes.timestamp, quotes.bid, quotes.ask, quotes.direction
FROM quotes
"""
cursor.execute(query)
tables = cursor.fetchall()
column_names = [desc [0] for desc in cursor.description]
data_list = []
for row in tables:
data_dict = {column_names[i]: row[i] for i in range(len(column_names))}
data_list.append(data_dict)
json_output = json.dumps(data_list, indent=4)
print(json_output)
cursor.close()