1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
from pathlib import Path
import sqlite3
from typing import Any
DB_PATH = Path(__file__).resolve().parent / "data" / "spot_prices.db"
def _connect() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_storage() -> None:
with _connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS spot_price_points (
id INTEGER PRIMARY KEY AUTOINCREMENT,
polled_at TEXT NOT NULL,
source_timestamp TEXT,
asg_name TEXT,
instance_id TEXT,
instance_type TEXT,
az TEXT,
spot_price REAL NOT NULL
)
"""
)
conn.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS ux_spot_point_unique
ON spot_price_points(instance_id, source_timestamp, spot_price)
"""
)
conn.commit()
def save_spot_datapoint(point: dict[str, Any]) -> bool:
init_storage()
with _connect() as conn:
cursor = conn.execute(
"""
INSERT OR IGNORE INTO spot_price_points(
polled_at,
source_timestamp,
asg_name,
instance_id,
instance_type,
az,
spot_price
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
point.get("polled_at"),
point.get("source_timestamp"),
point.get("asg_name"),
point.get("instance_id"),
point.get("instance_type"),
point.get("az"),
point.get("spot_price"),
),
)
conn.commit()
return cursor.rowcount > 0
def load_spot_datapoints(limit: int = 500) -> list[dict[str, Any]]:
init_storage()
limit = max(1, min(limit, 5000))
with _connect() as conn:
rows = conn.execute(
"""
SELECT
polled_at,
source_timestamp,
asg_name,
instance_id,
instance_type,
az,
spot_price
FROM spot_price_points
ORDER BY polled_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
# Return ascending for chart rendering
return [dict(row) for row in reversed(rows)]
def get_peak_spot_price() -> float | None:
init_storage()
with _connect() as conn:
row = conn.execute(
"""
SELECT MAX(spot_price) AS peak_spot_price
FROM spot_price_points
"""
).fetchone()
if not row:
return None
peak = row["peak_spot_price"]
return float(peak) if peak is not None else None
def get_first_breach(max_pay: float) -> dict[str, Any] | None:
init_storage()
with _connect() as conn:
row = conn.execute(
"""
SELECT
polled_at,
spot_price,
instance_type,
az
FROM spot_price_points
WHERE spot_price > ?
ORDER BY polled_at ASC
LIMIT 1
""",
(max_pay,),
).fetchone()
return dict(row) if row else None
|