aboutsummaryrefslogtreecommitdiffstats
path: root/persistance.py
blob: 69c6bb96e5107e836d53b47d3923f97211077976 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from pathlib import Path
import sqlite3
from typing import Any

DB_PATH = Path(__file__).resolve().parent / "data" / "spot_prices.db"


def _connect() -> sqlite3.Connection:
    DB_PATH.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def init_storage() -> None:
    with _connect() as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS spot_price_points (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                polled_at TEXT NOT NULL,
                source_timestamp TEXT,
                asg_name TEXT,
                instance_id TEXT,
                instance_type TEXT,
                az TEXT,
                spot_price REAL NOT NULL
            )
            """
        )
        conn.execute(
            """
            CREATE UNIQUE INDEX IF NOT EXISTS ux_spot_point_unique
            ON spot_price_points(instance_id, source_timestamp, spot_price)
            """
        )
        conn.commit()


def save_spot_datapoint(point: dict[str, Any]) -> bool:
    init_storage()

    with _connect() as conn:
        cursor = conn.execute(
            """
            INSERT OR IGNORE INTO spot_price_points(
                polled_at,
                source_timestamp,
                asg_name,
                instance_id,
                instance_type,
                az,
                spot_price
            ) VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            (
                point.get("polled_at"),
                point.get("source_timestamp"),
                point.get("asg_name"),
                point.get("instance_id"),
                point.get("instance_type"),
                point.get("az"),
                point.get("spot_price"),
            ),
        )
        conn.commit()

    return cursor.rowcount > 0


def load_spot_datapoints(limit: int = 500) -> list[dict[str, Any]]:
    init_storage()
    limit = max(1, min(limit, 5000))

    with _connect() as conn:
        rows = conn.execute(
            """
            SELECT
                polled_at,
                source_timestamp,
                asg_name,
                instance_id,
                instance_type,
                az,
                spot_price
            FROM spot_price_points
            ORDER BY polled_at DESC
            LIMIT ?
            """,
            (limit,),
        ).fetchall()

    # Return ascending for chart rendering
    return [dict(row) for row in reversed(rows)]


def get_peak_spot_price() -> float | None:
    init_storage()

    with _connect() as conn:
        row = conn.execute(
            """
            SELECT MAX(spot_price) AS peak_spot_price
            FROM spot_price_points
            """
        ).fetchone()

    if not row:
        return None

    peak = row["peak_spot_price"]
    return float(peak) if peak is not None else None


def get_first_breach(max_pay: float) -> dict[str, Any] | None:
    init_storage()

    with _connect() as conn:
        row = conn.execute(
            """
            SELECT
                polled_at,
                spot_price,
                instance_type,
                az
            FROM spot_price_points
            WHERE spot_price > ?
            ORDER BY polled_at ASC
            LIMIT 1
            """,
            (max_pay,),
        ).fetchone()

    return dict(row) if row else None
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage