From 03e57fcfcd9bc85bec2757fad9ae613bdb6f783b Mon Sep 17 00:00:00 2001 From: Pinapelz Date: Mon, 11 May 2026 00:27:29 -0700 Subject: init commit --- aws.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 aws.py (limited to 'aws.py') diff --git a/aws.py b/aws.py new file mode 100644 index 0000000..42b1342 --- /dev/null +++ b/aws.py @@ -0,0 +1,145 @@ +import os +from datetime import datetime, timezone + +import boto3 +import requests +from botocore.exceptions import BotoCoreError, ClientError +from dotenv import load_dotenv + +load_dotenv() + + +def find_matching_asg_instance(): + asg_name = os.getenv("ASG_NAME", "factorio-asg") + ec2 = boto3.client("ec2") + + try: + resp = ec2.describe_instances( + Filters=[ + { + "Name": "tag:aws:autoscaling:groupName", + "Values": [asg_name], + }, + { + "Name": "instance-state-name", + "Values": ["running"], + }, + ] + ) + except (BotoCoreError, ClientError) as exc: + return {"error": f"Failed to query EC2: {exc}"} + + instances = [] + for reservation in resp.get("Reservations", []): + for instance in reservation.get("Instances", []): + instances.append( + { + "id": instance.get("InstanceId"), + "type": instance.get("InstanceType"), + "az": instance.get("Placement", {}).get("AvailabilityZone"), + } + ) + + if not instances: + return None + + return {"asg_name": asg_name, "instances": [instances[0]]} + + +def get_current_spot_price(instance_type: str, az: str): + ec2 = boto3.client("ec2") + + try: + resp = ec2.describe_spot_price_history( + InstanceTypes=[instance_type], + ProductDescriptions=["Linux/UNIX"], + AvailabilityZone=az, + StartTime=datetime.now(timezone.utc), + MaxResults=1, + ) + except (BotoCoreError, ClientError) as exc: + return {"error": f"Failed to fetch spot history: {exc}"} + + if not resp.get("SpotPriceHistory"): + return None + + return resp["SpotPriceHistory"][0] + + +def fetch_instance_pricing(instance_type: str, az: str): + url = f"https://go.runs-on.com/api/instances/{instance_type}" + + try: + resp = requests.get( + url, + params={"az": az, "platform": "Linux/UNIX"}, + timeout=10, + ) + resp.raise_for_status() + return resp.json() + except requests.RequestException as exc: + return {"error": str(exc), "instance_type": instance_type, "az": az} + + +def _normalize_pricing_results(results: list[dict], fallback_az: str) -> list[dict]: + history = [] + + for item in results: + timestamp = item.get("timestamp") + spot_price = item.get("spotPrice") + + if not timestamp or spot_price is None: + continue + + normalized_item = { + "timestamp": str(timestamp), + "spotPrice": float(spot_price), + "onDemandPrice": float(item.get("onDemandPrice")) if item.get("onDemandPrice") is not None else None, + "productDescription": item.get("platform"), + "availabilityZone": item.get("az") or fallback_az, + } + history.append(normalized_item) + + history.sort(key=lambda row: row["timestamp"]) + return history + + +def get_asg_spot_pricing(): + instance_data = find_matching_asg_instance() + + if instance_data is None: + return {"error": "No running instances found for ASG"} + + if instance_data.get("error"): + return instance_data + + instance = instance_data["instances"][0] + pricing_data = fetch_instance_pricing(instance["type"], instance["az"]) + + if pricing_data.get("error"): + return pricing_data + + results = pricing_data.get("results") + if not isinstance(results, list) or not results: + return { + "error": "Pricing API returned no historical points", + "instance_type": instance["type"], + "az": instance["az"], + } + + history = _normalize_pricing_results(results, instance["az"]) + if not history: + return { + "error": "Pricing API points could not be normalized", + "instance_type": instance["type"], + "az": instance["az"], + } + + latest = history[-1] + + return { + "asg_name": instance_data.get("asg_name"), + "instance": instance, + "latest": latest, + "history": history, + } -- cgit v1.2.3