aboutsummaryrefslogtreecommitdiffstats
path: root/aws.py
diff options
context:
space:
mode:
Diffstat (limited to 'aws.py')
-rw-r--r--aws.py145
1 files changed, 145 insertions, 0 deletions
diff --git a/aws.py b/aws.py
new file mode 100644
index 0000000..42b1342
--- /dev/null
+++ b/aws.py
@@ -0,0 +1,145 @@
+import os
+from datetime import datetime, timezone
+
+import boto3
+import requests
+from botocore.exceptions import BotoCoreError, ClientError
+from dotenv import load_dotenv
+
+load_dotenv()
+
+
+def find_matching_asg_instance():
+ asg_name = os.getenv("ASG_NAME", "factorio-asg")
+ ec2 = boto3.client("ec2")
+
+ try:
+ resp = ec2.describe_instances(
+ Filters=[
+ {
+ "Name": "tag:aws:autoscaling:groupName",
+ "Values": [asg_name],
+ },
+ {
+ "Name": "instance-state-name",
+ "Values": ["running"],
+ },
+ ]
+ )
+ except (BotoCoreError, ClientError) as exc:
+ return {"error": f"Failed to query EC2: {exc}"}
+
+ instances = []
+ for reservation in resp.get("Reservations", []):
+ for instance in reservation.get("Instances", []):
+ instances.append(
+ {
+ "id": instance.get("InstanceId"),
+ "type": instance.get("InstanceType"),
+ "az": instance.get("Placement", {}).get("AvailabilityZone"),
+ }
+ )
+
+ if not instances:
+ return None
+
+ return {"asg_name": asg_name, "instances": [instances[0]]}
+
+
+def get_current_spot_price(instance_type: str, az: str):
+ ec2 = boto3.client("ec2")
+
+ try:
+ resp = ec2.describe_spot_price_history(
+ InstanceTypes=[instance_type],
+ ProductDescriptions=["Linux/UNIX"],
+ AvailabilityZone=az,
+ StartTime=datetime.now(timezone.utc),
+ MaxResults=1,
+ )
+ except (BotoCoreError, ClientError) as exc:
+ return {"error": f"Failed to fetch spot history: {exc}"}
+
+ if not resp.get("SpotPriceHistory"):
+ return None
+
+ return resp["SpotPriceHistory"][0]
+
+
+def fetch_instance_pricing(instance_type: str, az: str):
+ url = f"https://go.runs-on.com/api/instances/{instance_type}"
+
+ try:
+ resp = requests.get(
+ url,
+ params={"az": az, "platform": "Linux/UNIX"},
+ timeout=10,
+ )
+ resp.raise_for_status()
+ return resp.json()
+ except requests.RequestException as exc:
+ return {"error": str(exc), "instance_type": instance_type, "az": az}
+
+
+def _normalize_pricing_results(results: list[dict], fallback_az: str) -> list[dict]:
+ history = []
+
+ for item in results:
+ timestamp = item.get("timestamp")
+ spot_price = item.get("spotPrice")
+
+ if not timestamp or spot_price is None:
+ continue
+
+ normalized_item = {
+ "timestamp": str(timestamp),
+ "spotPrice": float(spot_price),
+ "onDemandPrice": float(item.get("onDemandPrice")) if item.get("onDemandPrice") is not None else None,
+ "productDescription": item.get("platform"),
+ "availabilityZone": item.get("az") or fallback_az,
+ }
+ history.append(normalized_item)
+
+ history.sort(key=lambda row: row["timestamp"])
+ return history
+
+
+def get_asg_spot_pricing():
+ instance_data = find_matching_asg_instance()
+
+ if instance_data is None:
+ return {"error": "No running instances found for ASG"}
+
+ if instance_data.get("error"):
+ return instance_data
+
+ instance = instance_data["instances"][0]
+ pricing_data = fetch_instance_pricing(instance["type"], instance["az"])
+
+ if pricing_data.get("error"):
+ return pricing_data
+
+ results = pricing_data.get("results")
+ if not isinstance(results, list) or not results:
+ return {
+ "error": "Pricing API returned no historical points",
+ "instance_type": instance["type"],
+ "az": instance["az"],
+ }
+
+ history = _normalize_pricing_results(results, instance["az"])
+ if not history:
+ return {
+ "error": "Pricing API points could not be normalized",
+ "instance_type": instance["type"],
+ "az": instance["az"],
+ }
+
+ latest = history[-1]
+
+ return {
+ "asg_name": instance_data.get("asg_name"),
+ "instance": instance,
+ "latest": latest,
+ "history": history,
+ }
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage