import os from datetime import datetime, timezone import boto3 import requests from botocore.exceptions import BotoCoreError, ClientError from dotenv import load_dotenv load_dotenv() def find_matching_asg_instance(): asg_name = os.getenv("ASG_NAME", "factorio-asg") ec2 = boto3.client("ec2") try: resp = ec2.describe_instances( Filters=[ { "Name": "tag:aws:autoscaling:groupName", "Values": [asg_name], }, { "Name": "instance-state-name", "Values": ["running"], }, ] ) except (BotoCoreError, ClientError) as exc: return {"error": f"Failed to query EC2: {exc}"} instances = [] for reservation in resp.get("Reservations", []): for instance in reservation.get("Instances", []): instances.append( { "id": instance.get("InstanceId"), "type": instance.get("InstanceType"), "az": instance.get("Placement", {}).get("AvailabilityZone"), } ) if not instances: return None return {"asg_name": asg_name, "instances": [instances[0]]} def get_current_spot_price(instance_type: str, az: str): ec2 = boto3.client("ec2") try: resp = ec2.describe_spot_price_history( InstanceTypes=[instance_type], ProductDescriptions=["Linux/UNIX"], AvailabilityZone=az, StartTime=datetime.now(timezone.utc), MaxResults=1, ) except (BotoCoreError, ClientError) as exc: return {"error": f"Failed to fetch spot history: {exc}"} if not resp.get("SpotPriceHistory"): return None return resp["SpotPriceHistory"][0] def fetch_instance_pricing(instance_type: str, az: str): url = f"https://go.runs-on.com/api/instances/{instance_type}" try: resp = requests.get( url, params={"az": az, "platform": "Linux/UNIX"}, timeout=10, ) resp.raise_for_status() return resp.json() except requests.RequestException as exc: return {"error": str(exc), "instance_type": instance_type, "az": az} def _normalize_pricing_results(results: list[dict], fallback_az: str) -> list[dict]: history = [] for item in results: timestamp = item.get("timestamp") spot_price = item.get("spotPrice") if not timestamp or spot_price is None: continue normalized_item = { "timestamp": str(timestamp), "spotPrice": float(spot_price), "onDemandPrice": float(item.get("onDemandPrice")) if item.get("onDemandPrice") is not None else None, "productDescription": item.get("platform"), "availabilityZone": item.get("az") or fallback_az, } history.append(normalized_item) history.sort(key=lambda row: row["timestamp"]) return history def get_asg_spot_pricing(): instance_data = find_matching_asg_instance() if instance_data is None: return {"error": "No running instances found for ASG"} if instance_data.get("error"): return instance_data instance = instance_data["instances"][0] pricing_data = fetch_instance_pricing(instance["type"], instance["az"]) if pricing_data.get("error"): return pricing_data results = pricing_data.get("results") if not isinstance(results, list) or not results: return { "error": "Pricing API returned no historical points", "instance_type": instance["type"], "az": instance["az"], } history = _normalize_pricing_results(results, instance["az"]) if not history: return { "error": "Pricing API points could not be normalized", "instance_type": instance["type"], "az": instance["az"], } latest = history[-1] return { "asg_name": instance_data.get("asg_name"), "instance": instance, "latest": latest, "history": history, }